Add Kotlin Flow adapter with SSE support#7
Conversation
Agent-Logs-Url: https://github.com/Goooler/retrofit/sessions/3633ae89-651c-4c0d-a27d-8dce1f512c5e Co-authored-by: Goooler <10363352+Goooler@users.noreply.github.com>
Agent-Logs-Url: https://github.com/Goooler/retrofit/sessions/3633ae89-651c-4c0d-a27d-8dce1f512c5e Co-authored-by: Goooler <10363352+Goooler@users.noreply.github.com>
Agent-Logs-Url: https://github.com/Goooler/retrofit/sessions/3633ae89-651c-4c0d-a27d-8dce1f512c5e Co-authored-by: Goooler <10363352+Goooler@users.noreply.github.com>
|
@copilot I think the function should be 'suspend' only; we shouldn't support normal functions. |
Agent-Logs-Url: https://github.com/Goooler/retrofit/sessions/4230e23e-623b-409b-8556-2eca270a1ce9 Co-authored-by: Goooler <10363352+Goooler@users.noreply.github.com>
Done in ac2670d. The factory now only handles |
There was a problem hiding this comment.
Pull request overview
This PR adds a new retrofit-adapters:kotlin-flow module which introduces a Retrofit CallAdapter.Factory to support kotlinx.coroutines.flow.Flow as the return type for Kotlin suspend service methods, including an SSE streaming mode.
Changes:
- Registers a new
retrofit-adapters:kotlin-flowGradle module (build + publishing metadata) and wires it into the build. - Introduces
FlowCallAdapterFactorywith two modes: single-bodyFlow<T>and SSE streaming via OkHttpokhttp-sse. - Adds initial unit tests for body flows and SSE flows, plus documentation and changelog entry.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
settings.gradle |
Includes the new :retrofit-adapters:kotlin-flow module in the build. |
retrofit-adapters/kotlin-flow/src/main/java/retrofit2/adapter/flow/FlowCallAdapterFactory.kt |
Implements the Flow call adapter (single-body and SSE streaming). |
retrofit-adapters/kotlin-flow/src/main/java/retrofit2/adapter/flow/ServerSentEvent.kt |
Adds an SSE event model type exposed by the adapter. |
retrofit-adapters/kotlin-flow/src/test/java/retrofit2/adapter/flow/FlowCallAdapterFactoryTest.kt |
Adds unit tests for body and SSE flows using MockWebServer. |
retrofit-adapters/kotlin-flow/README.md |
Documents usage and behavior of the new adapter module. |
retrofit-adapters/kotlin-flow/build.gradle |
Adds module dependencies (Retrofit, coroutines, okhttp-sse) and jar metadata. |
retrofit-adapters/kotlin-flow/gradle.properties |
Provides Maven publishing coordinates and module description. |
gradle/libs.versions.toml |
Adds okhttp-sse to the version catalog for dependency management. |
CHANGELOG.md |
Notes the addition of the Kotlin Flow adapter with SSE support. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| call | ||
| .clone() | ||
| .enqueue( | ||
| object : Callback<R> { | ||
| override fun onResponse(call: Call<R>, response: Response<R>) { | ||
| if (!response.isSuccessful) { | ||
| close(HttpException(response)) | ||
| return | ||
| } | ||
| val body = response.body() | ||
| if (body == null) { | ||
| close() | ||
| return | ||
| } | ||
| trySend(body) | ||
| close() | ||
| } | ||
|
|
||
| override fun onFailure(call: Call<R>, t: Throwable) { | ||
| close(t) | ||
| } | ||
| } | ||
| ) | ||
|
|
||
| awaitClose { call.cancel() } |
There was a problem hiding this comment.
bodyFlow enqueues a cloned call (call.clone().enqueue(...)) but the awaitClose block cancels the original call, not the cloned/enqueued instance. This means canceling the returned Flow will not cancel the in-flight HTTP request. Capture the cloned call in a local variable and cancel that in awaitClose (and use the same instance for enqueue).
| call | |
| .clone() | |
| .enqueue( | |
| object : Callback<R> { | |
| override fun onResponse(call: Call<R>, response: Response<R>) { | |
| if (!response.isSuccessful) { | |
| close(HttpException(response)) | |
| return | |
| } | |
| val body = response.body() | |
| if (body == null) { | |
| close() | |
| return | |
| } | |
| trySend(body) | |
| close() | |
| } | |
| override fun onFailure(call: Call<R>, t: Throwable) { | |
| close(t) | |
| } | |
| } | |
| ) | |
| awaitClose { call.cancel() } | |
| val flowCall = call.clone() | |
| flowCall.enqueue( | |
| object : Callback<R> { | |
| override fun onResponse(call: Call<R>, response: Response<R>) { | |
| if (!response.isSuccessful) { | |
| close(HttpException(response)) | |
| return | |
| } | |
| val body = response.body() | |
| if (body == null) { | |
| close() | |
| return | |
| } | |
| trySend(body) | |
| close() | |
| } | |
| override fun onFailure(call: Call<R>, t: Throwable) { | |
| close(t) | |
| } | |
| } | |
| ) | |
| awaitClose { flowCall.cancel() } |
| val elementType = getParameterUpperBound(0, callType) | ||
| val responseType = if (isStreaming) ResponseBody::class.java else elementType | ||
| val eventSourceFactory = | ||
| if (isStreaming) EventSources.createFactory(retrofit.callFactory()) else null | ||
| return SuspendFlowCallAdapter<Any>(responseType, isStreaming, eventSourceFactory) |
There was a problem hiding this comment.
When isStreaming is true, responseType is forced to ResponseBody and the adapter will produce an SSE Flow<ServerSentEvent> regardless of the declared Flow<T> element type. This can let @Streaming suspend fun foo(): Flow<Foo> compile but emit ServerSentEvent at runtime (type mismatch). Consider validating that elementType is ServerSentEvent (or otherwise constrain the streaming path) and throw a clear error if it is not.
| override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { | ||
| trySend(ServerSentEvent(id = id, event = type, data = data)) | ||
| } |
There was a problem hiding this comment.
onEvent uses trySend(...) but ignores the result. If the channel buffer is full or closed, events can be dropped silently, which breaks streaming semantics. Handle the ChannelResult (e.g., close with an error on buffer overflow) or switch to a sending approach that provides backpressure without dropping (such as sending from a launched coroutine).
| val body = response.body() | ||
| if (body == null) { | ||
| close() | ||
| return | ||
| } |
There was a problem hiding this comment.
For successful responses with a null body (e.g., 204), this flow currently completes without emitting anything. That makes suspend fun foo(): Flow<T> behave like an empty stream rather than failing like Retrofit’s non-nullable suspend fun foo(): T does. Consider failing the flow when body == null (or explicitly documenting/handling a Flow<Unit>/nullable body case) to avoid silent data loss.
| ### Server-Sent Events (SSE) | ||
|
|
||
| Add `@Streaming` to stream a response as [Server-Sent Events][2]. The return type must be | ||
| `Flow<ServerSentEvent>`. The flow emits one `ServerSentEvent` for each event dispatched by the | ||
| server and completes when the connection is closed. Cancelling the flow cancels the underlying | ||
| OkHttp `EventSource`. |
There was a problem hiding this comment.
PR description/usage mentions an @SSE annotation and an in-module W3C-compliant SseParser, but this module’s public docs instruct using Retrofit’s @Streaming and state parsing is delegated to OkHttp okhttp-sse. Please reconcile the PR description and module documentation/implementation (either implement the promised @SSE + parser, or update the PR description to match the chosen @Streaming + OkHttp SSE approach).
| /** | ||
| * Represents a single Server-Sent Event as defined by the | ||
| * [W3C SSE specification](https://html.spec.whatwg.org/multipage/server-sent-events.html). | ||
| * | ||
| * @property id The last event ID, or `null` if none was set. | ||
| * @property event The event type, or `null` if the default "message" type. | ||
| * @property data The data payload. Multiple `data:` lines are joined with `\n`. | ||
| */ | ||
| data class ServerSentEvent(val id: String?, val event: String?, val data: String) |
There was a problem hiding this comment.
The PR description says ServerSentEvent follows the SSE spec with id, event, data, and retry fields, but this data class currently exposes only id, event, and data. Either add the missing retry field (and ensure it’s populated) or update the PR description/docs to match the actual API surface.
|
@copilot apply changes based on the comments in this thread |
Agent-Logs-Url: https://github.com/Goooler/retrofit/sessions/9b23277e-adee-427c-bf19-4ca25610506c Co-authored-by: Goooler <10363352+Goooler@users.noreply.github.com>
Applied all four actionable changes in 0db231b:
Two new tests were added: |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val body = response.body() | ||
| if (body == null) { | ||
| close(NullPointerException("Response body of a suspend fun was null")) | ||
| return |
There was a problem hiding this comment.
For a successful HTTP response with a null body, Retrofit's existing suspend handling throws a KotlinNullPointerException with a message that includes the service + method name (via Invocation tag). This adapter currently throws a plain NullPointerException("Response body of a suspend fun was null"), which is less actionable and inconsistent with the rest of the codebase. Consider mirroring KotlinExtensions.await()'s null-body exception/message here.
| t: Throwable?, | ||
| response: okhttp3.Response?, | ||
| ) { | ||
| close(t ?: response?.let { HttpException(Response.error<Nothing>(it.body, it)) }) |
There was a problem hiding this comment.
Response.error(body, rawResponse) requires a non-null body and a non-successful rawResponse. If OkHttp-SSE invokes onFailure with a successful response (e.g., protocol/content-type issues) or a null body, this will throw from the callback thread instead of failing the Flow. Handle t first, then only build HttpException when response != null && !response.isSuccessful, otherwise close with an IOException/IllegalStateException describing the SSE failure.
| close(t ?: response?.let { HttpException(Response.error<Nothing>(it.body, it)) }) | |
| val failure = | |
| when { | |
| t != null -> t | |
| response != null && !response.isSuccessful -> HttpException(response) | |
| response != null -> | |
| java.io.IOException( | |
| "SSE stream failed with unexpected successful response: ${response.code}", | |
| ) | |
| else -> IllegalStateException("SSE stream failed without throwable or response") | |
| } | |
| close(failure) |
| fail("Expected NullPointerException") | ||
| } catch (_: NullPointerException) { | ||
| // expected |
There was a problem hiding this comment.
This test asserts a NullPointerException for the 204/null-body case. If the adapter is updated to mirror Retrofit's suspend behavior (throwing KotlinNullPointerException with a service/method-specific message), update the expected exception type/message here so the test reflects the intended compatibility contract.
| fail("Expected NullPointerException") | |
| } catch (_: NullPointerException) { | |
| // expected | |
| fail("Expected KotlinNullPointerException") | |
| } catch (e: KotlinNullPointerException) { | |
| assertThat(e).hasMessageThat().isEqualTo( | |
| "Response from retrofit2.adapter.flow.FlowCallAdapterFactoryTest\$Service.body was null but response body type was declared as non-null", | |
| ) |
|
@copilot apply changes based on the comments in this thread |
Agent-Logs-Url: https://github.com/Goooler/retrofit/sessions/bf16f351-1ef4-4553-8caa-fc672d63dd47 Co-authored-by: Goooler <10363352+Goooler@users.noreply.github.com>
Applied all three changes in e36eb1b:
|
Adds a new
retrofit-adapters/kotlin-flowmodule that enableskotlinx.coroutines.flow.Flowas a suspend Retrofit service method return type, with built-in Server-Sent Events (SSE) streaming support via OkHttpokhttp-sse.Usage
Changes
retrofit-adapters/kotlin-flow/– new Gradle module registered insettings.gradleServerSentEvent– data class exposingid,event, anddatafields; SSE parsing and connection management are delegated to OkHttpokhttp-sseFlowCallAdapterFactory– handles onlysuspend fun foo(): Flow<T>return types (non-suspendFlow<T>is not supported)@Streamingannotation; the return type must beFlow<ServerSentEvent>— a clear error is thrown at call time if a different element type is usedEventSourceand emit parsedServerSentEventobjects; theEventSourceis cancelled when the flow is cancelled or the channel buffer overflowsonFailurein the SSE listener uses safe structured branching: propagates the throwable if present, wraps non-successful responses inHttpException, reports unexpected successful responses asIOException, and falls back toIllegalStateExceptionwhen no info is availableKotlinNullPointerExceptioncarrying the service + method name, consistent withKotlinExtensions.await()in Retrofit coreFlowAsCallwrapper defers the HTTP request until the returnedFlowis collected; flow cancellation correctly cancels the in-flight HTTP callFlowCallAdapterFactoryTestcovering SSE events, multiline data, retry fields, error handling, body flows, null-body failure (asserting the exactKotlinNullPointerExceptionmessage), and wrong-element-type validationCHANGELOG.md's "Unreleased" section has been updated, if applicable.