diff --git a/retrofit-mock/src/main/java/retrofit2/mock/BehaviorDelegate.java b/retrofit-mock/src/main/java/retrofit2/mock/BehaviorDelegate.java index ed5d3b1abf..d506dbbce1 100644 --- a/retrofit-mock/src/main/java/retrofit2/mock/BehaviorDelegate.java +++ b/retrofit-mock/src/main/java/retrofit2/mock/BehaviorDelegate.java @@ -77,13 +77,9 @@ public T returning(Call call) { Call adaptedCall = (Call) adapted; Continuation continuation = (Continuation) args[args.length - 1]; - try { - return adapterInfo.wantsResponse - ? KotlinExtensions.awaitResponse(adaptedCall, continuation) - : KotlinExtensions.await(adaptedCall, continuation); - } catch (Exception e) { - return KotlinExtensions.suspendAndThrow(e, continuation); - } + return adapterInfo.wantsResponse + ? KotlinExtensions.awaitResponse(adaptedCall, continuation) + : KotlinExtensions.await(adaptedCall, continuation); }); } diff --git a/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt b/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt index 260fd7ab98..339c768f37 100644 --- a/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt +++ b/retrofit/kotlin-test/src/test/java/retrofit2/KotlinSuspendTest.kt @@ -15,8 +15,15 @@ */ package retrofit2 +import java.io.IOException +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import java.util.concurrent.Executors +import kotlin.coroutines.CoroutineContext import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Runnable +import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext @@ -35,10 +42,6 @@ import retrofit2.helpers.ToStringConverterFactory import retrofit2.http.GET import retrofit2.http.HEAD import retrofit2.http.Path -import java.io.IOException -import java.lang.reflect.ParameterizedType -import java.lang.reflect.Type -import kotlin.coroutines.CoroutineContext class KotlinSuspendTest { @get:Rule val server = MockWebServer() @@ -353,6 +356,35 @@ class KotlinSuspendTest { } } + @Test fun usesCoroutineContextForCallFactory() { + val executor = Executors.newSingleThreadExecutor() + val okHttpClient = OkHttpClient() + var callFactoryThread: Thread? = null + val outerContextThread: Thread + val retrofit = Retrofit.Builder() + .baseUrl(server.url("/")) + .callFactory { + callFactoryThread = Thread.currentThread() + okHttpClient.newCall(it) + } + .addConverterFactory(ToStringConverterFactory()) + .coroutineDispatcher(executor.asCoroutineDispatcher()) + .build() + val example = retrofit.create(Service::class.java) + + server.enqueue(MockResponse().setBody("Hi")) + + runBlocking { + outerContextThread = Thread.currentThread() + example.body() + } + + assertThat(callFactoryThread).isNotNull + assertThat(outerContextThread).isNotEqualTo(callFactoryThread) + executor.shutdownNow() + } + + @Suppress("EXPERIMENTAL_OVERRIDE") private object DirectUnconfinedDispatcher : CoroutineDispatcher() { override fun isDispatchNeeded(context: CoroutineContext): Boolean = false diff --git a/retrofit/src/main/java/retrofit2/HttpServiceMethod.java b/retrofit/src/main/java/retrofit2/HttpServiceMethod.java index 2ca5c7f0ae..aaa9061c79 100644 --- a/retrofit/src/main/java/retrofit2/HttpServiceMethod.java +++ b/retrofit/src/main/java/retrofit2/HttpServiceMethod.java @@ -192,12 +192,7 @@ protected Object adapt(Call call, Object[] args) { Continuation> continuation = (Continuation>) args[args.length - 1]; - // See SuspendForBody for explanation about this try/catch. - try { - return KotlinExtensions.awaitResponse(call, continuation); - } catch (Exception e) { - return KotlinExtensions.suspendAndThrow(e, continuation); - } + return KotlinExtensions.awaitResponse(call, continuation); } } @@ -226,25 +221,13 @@ protected Object adapt(Call call, Object[] args) { //noinspection unchecked Checked by reflection inside RequestFactory. Continuation continuation = (Continuation) args[args.length - 1]; - // Calls to OkHttp Call.enqueue() like those inside await and awaitNullable can sometimes - // invoke the supplied callback with an exception before the invoking stack frame can return. - // Coroutines will intercept the subsequent invocation of the Continuation and throw the - // exception synchronously. A Java Proxy cannot throw checked exceptions without them being - // declared on the interface method. To avoid the synchronous checked exception being wrapped - // in an UndeclaredThrowableException, it is intercepted and supplied to a helper which will - // force suspension to occur so that it can be instead delivered to the continuation to - // bypass this restriction. - try { - if (isUnit) { - //noinspection unchecked Checked by isUnit - return KotlinExtensions.awaitUnit((Call) call, (Continuation) continuation); - } else if (isNullable) { - return KotlinExtensions.awaitNullable(call, continuation); - } else { - return KotlinExtensions.await(call, continuation); - } - } catch (Exception e) { - return KotlinExtensions.suspendAndThrow(e, continuation); + if (isUnit) { + //noinspection unchecked Checked by isUnit + return KotlinExtensions.awaitUnit((Call) call, (Continuation) continuation); + } else if (isNullable) { + return KotlinExtensions.await(call, continuation); + } else { + return KotlinExtensions.await(call, continuation); } } } diff --git a/retrofit/src/main/java/retrofit2/KotlinExtensions.kt b/retrofit/src/main/java/retrofit2/KotlinExtensions.kt index d334b25136..224b8fdd44 100644 --- a/retrofit/src/main/java/retrofit2/KotlinExtensions.kt +++ b/retrofit/src/main/java/retrofit2/KotlinExtensions.kt @@ -20,67 +20,70 @@ package retrofit2 import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.suspendCancellableCoroutine -import java.lang.reflect.ParameterizedType -import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED -import kotlin.coroutines.intrinsics.intercepted -import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn +import kotlinx.coroutines.withContext import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException inline fun Retrofit.create(): T = create(T::class.java) suspend fun Call.await(): T { - return suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - cancel() - } - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - if (response.isSuccessful) { - val body = response.body() - if (body == null) { - val invocation = call.request().tag(Invocation::class.java)!! - val method = invocation.method() - val e = KotlinNullPointerException("Response from " + + // TODO: a better solution for off-main-thread call factories than this. + return withContext(Dispatchers.Default) { + suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + cancel() + } + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + if (response.isSuccessful) { + val body = response.body() + if (body == null) { + val invocation = call.request().tag(Invocation::class.java)!! + val method = invocation.method() + val e = KotlinNullPointerException("Response from " + method.declaringClass.name + '.' + method.name + " was null but response body type was declared as non-null") - continuation.resumeWithException(e) + continuation.resumeWithException(e) + } else { + continuation.resume(body) + } } else { - continuation.resume(body) + continuation.resumeWithException(HttpException(response)) } - } else { - continuation.resumeWithException(HttpException(response)) } - } - override fun onFailure(call: Call, t: Throwable) { - continuation.resumeWithException(t) - } - }) + override fun onFailure(call: Call, t: Throwable) { + continuation.resumeWithException(t) + } + }) + } } } @JvmName("awaitNullable") suspend fun Call.await(): T? { - return suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - cancel() - } - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - if (response.isSuccessful) { - continuation.resume(response.body()) - } else { - continuation.resumeWithException(HttpException(response)) - } + // TODO: a better solution for off-main-thread call factories than this. + return withContext(Dispatchers.Default) { + suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + cancel() } + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + if (response.isSuccessful) { + continuation.resume(response.body()) + } else { + continuation.resumeWithException(HttpException(response)) + } + } - override fun onFailure(call: Call, t: Throwable) { - continuation.resumeWithException(t) - } - }) + override fun onFailure(call: Call, t: Throwable) { + continuation.resumeWithException(t) + } + }) + } } } @@ -91,36 +94,21 @@ suspend fun Call.await() { } suspend fun Call.awaitResponse(): Response { - return suspendCancellableCoroutine { continuation -> - continuation.invokeOnCancellation { - cancel() - } - enqueue(object : Callback { - override fun onResponse(call: Call, response: Response) { - continuation.resume(response) + // TODO: a better solution for off-main-thread call factories than this. + return withContext(Dispatchers.Default) { + suspendCancellableCoroutine { continuation -> + continuation.invokeOnCancellation { + cancel() } + enqueue(object : Callback { + override fun onResponse(call: Call, response: Response) { + continuation.resume(response) + } - override fun onFailure(call: Call, t: Throwable) { - continuation.resumeWithException(t) - } - }) - } -} - -/** - * Force the calling coroutine to suspend before throwing [this]. - * - * This is needed when a checked exception is synchronously caught in a [java.lang.reflect.Proxy] - * invocation to avoid being wrapped in [java.lang.reflect.UndeclaredThrowableException]. - * - * The implementation is derived from: - * https://github.com/Kotlin/kotlinx.coroutines/pull/1667#issuecomment-556106349 - */ -internal suspend fun Exception.suspendAndThrow(): Nothing { - suspendCoroutineUninterceptedOrReturn { continuation -> - Dispatchers.Default.dispatch(continuation.context) { - continuation.intercepted().resumeWithException(this@suspendAndThrow) + override fun onFailure(call: Call, t: Throwable) { + continuation.resumeWithException(t) + } + }) } - COROUTINE_SUSPENDED } }