Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Add explicit keep rules for RxJava `Result` types to prevent their generic information from being removed.
- Add `allowoptimization` flags for most kept types.
- Add `Invocation.annotationUrl` which returns the original URL from the method annotation.
- Add Kotlin `Flow` adapter with SSE support.

**Changed**

Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serializa
kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" }
kotlinx-serialization-proto = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinx-serialization" }
okhttp-client = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
okhttp-sse = { module = "com.squareup.okhttp3:okhttp-sse", version.ref = "okhttp" }
okhttp-loggingInterceptor = { module = "com.squareup.okhttp3:logging-interceptor", version.ref = "okhttp" }
okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp" }
junit = { module = "junit:junit", version = "4.13.2" }
Expand Down
93 changes: 93 additions & 0 deletions retrofit-adapters/kotlin-flow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
Kotlin Flow Adapter
===================

A `CallAdapter.Factory` for adapting [Kotlin coroutine `Flow`][1] return types in Retrofit `suspend`
service methods.

Supported return types:

* `Flow<T>` — emits the single converted response body and completes.
* `Flow<ServerSentEvent>` (with `@Streaming`) — streams Server-Sent Events from the response body,
emitting one `ServerSentEvent` per event.


Usage
-----

Add `FlowCallAdapterFactory` as a call adapter when building your `Retrofit` instance:

```kotlin
val retrofit = Retrofit.Builder()
.baseUrl("https://example.com/")
.addCallAdapterFactory(FlowCallAdapterFactory.create())
.build()
```

### Regular body flow

Annotate a `suspend` service method with any Retrofit HTTP annotation and return `Flow<T>`. The flow
emits the single converted response body when collected, then completes. On a non-2xx response or a
network failure the flow fails with `HttpException` or `IOException` respectively.

```kotlin
interface MyService {
@GET("/user")
suspend fun getUser(): Flow<User>
}
```

### Server-Sent Events (SSE)

Add `@Streaming` to stream a response as [Server-Sent Events][2]. The return type must be
`Flow<ServerSentEvent>`. The flow emits one `ServerSentEvent` for each event dispatched by the
server and completes when the connection is closed. Cancelling the flow cancels the underlying
OkHttp `EventSource`.
Comment on lines +39 to +44

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description/usage mentions an @SSE annotation and an in-module W3C-compliant SseParser, but this module’s public docs instruct using Retrofit’s @Streaming and state parsing is delegated to OkHttp okhttp-sse. Please reconcile the PR description and module documentation/implementation (either implement the promised @SSE + parser, or update the PR description to match the chosen @Streaming + OkHttp SSE approach).

Copilot uses AI. Check for mistakes.

```kotlin
interface MyService {
@Streaming
@GET("/events")
suspend fun events(): Flow<ServerSentEvent>
}
```

`ServerSentEvent` exposes the fields defined by the [W3C SSE specification][2]:

| Property | Type | Description |
| -------- | --------- | ---------------------------------------------------------- |
| `id` | `String?` | Last event ID, or `null` if not set. |
| `event` | `String?` | Event type, or `null` for the default `"message"` type. |
| `data` | `String` | Data payload; multiple `data:` lines are joined with `\n`. |

Parsing and connection management are delegated to OkHttp's `okhttp-sse` library. The `Accept:
text/event-stream` header is added automatically.


Download
--------

Download [the latest JAR][3] or grab via [Maven][4]:

```xml
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>adapter-kotlin-flow</artifactId>
<version>latest.version</version>
</dependency>
```

or [Gradle][4]:

```kotlin
implementation("com.squareup.retrofit2:adapter-kotlin-flow:latest.version")
```

Snapshots of the development version are available in [Sonatype's `snapshots` repository][snap].



[1]: https://kotlinlang.org/docs/flow.html
[2]: https://html.spec.whatwg.org/multipage/server-sent-events.html
[3]: https://search.maven.org/remote_content?g=com.squareup.retrofit2&a=adapter-kotlin-flow&v=LATEST
[4]: https://search.maven.org/search?q=g:com.squareup.retrofit2%20a:adapter-kotlin-flow
[snap]: https://s01.oss.sonatype.org/content/repositories/snapshots/
21 changes: 21 additions & 0 deletions retrofit-adapters/kotlin-flow/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'com.vanniktech.maven.publish'

dependencies {
api projects.retrofit
api libs.okhttp.sse
api libs.kotlinx.coroutines

compileOnly libs.findBugsAnnotations

testImplementation libs.junit
testImplementation libs.truth
testImplementation libs.okhttp.mockwebserver
testImplementation libs.kotlinx.coroutines
}

jar {
manifest {
attributes 'Automatic-Module-Name': 'retrofit2.adapter.kotlinflow'
}
}
3 changes: 3 additions & 0 deletions retrofit-adapters/kotlin-flow/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
POM_ARTIFACT_ID=adapter-kotlin-flow
POM_NAME=Adapter: Kotlin Flow
POM_DESCRIPTION=A Retrofit CallAdapter for Kotlin Flow, with support for Server-Sent Events (SSE).
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Copyright (C) 2026 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.flow

import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import okhttp3.Request
import okhttp3.ResponseBody
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import okio.Timeout
import retrofit2.Call
import retrofit2.CallAdapter
import retrofit2.Callback
import retrofit2.HttpException
import retrofit2.Response
import retrofit2.Retrofit
import retrofit2.http.Streaming

/**
* A [CallAdapter.Factory] that supports [Flow] as a **suspend** service-method return type.
*
* ## SSE (Server-Sent Events)
*
* When the method is also annotated with [@Streaming][retrofit2.http.Streaming], the adapter
* streams the HTTP response body as Server-Sent Events, emitting each parsed [ServerSentEvent] to
* the flow:
*
* ```kotlin
* interface Service {
* @Streaming
* @GET("events")
* suspend fun events(): Flow<ServerSentEvent>
* }
* ```
*
* Register this factory with [Retrofit.Builder.addCallAdapterFactory]:
*
* ```kotlin
* val retrofit = Retrofit.Builder()
* .baseUrl(baseUrl)
* .addCallAdapterFactory(FlowCallAdapterFactory.create())
* .build()
* ```
*
* ## Non-SSE flows
*
* Without [@Streaming][retrofit2.http.Streaming], a `suspend fun foo(): Flow<T>` return type will
* emit a single converted response body (like a regular body call) and complete, or fail with
* [HttpException] / [java.io.IOException] as appropriate.
*
* ```kotlin
* interface Service {
* @GET("user")
* suspend fun getUser(): Flow<String>
* }
* ```
*/
class FlowCallAdapterFactory private constructor() : CallAdapter.Factory() {

companion object {
@JvmStatic
fun create(): FlowCallAdapterFactory = FlowCallAdapterFactory()
}

override fun get(
returnType: Type,
annotations: Array<Annotation>,
retrofit: Retrofit,
): CallAdapter<*, *>? {
val isStreaming = annotations.any { it is Streaming }

if (getRawType(returnType) != Call::class.java) return null
if (returnType !is ParameterizedType) return null
val callType = getParameterUpperBound(0, returnType)
if (getRawType(callType) != Flow::class.java) return null
if (callType !is ParameterizedType) {
error(
"Flow return type must be parameterized as Flow<Foo> or Flow<? extends Foo>"
)
}
val elementType = getParameterUpperBound(0, callType)
val responseType = if (isStreaming) ResponseBody::class.java else elementType
val eventSourceFactory = if (isStreaming) EventSources.createFactory(retrofit.callFactory()) else null
return SuspendFlowCallAdapter<Any>(responseType, isStreaming, eventSourceFactory)
Comment on lines +98 to +107

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When isStreaming is true, responseType is forced to ResponseBody and the adapter will produce an SSE Flow<ServerSentEvent> regardless of the declared Flow<T> element type. This can let @Streaming suspend fun foo(): Flow<Foo> compile but emit ServerSentEvent at runtime (type mismatch). Consider validating that elementType is ServerSentEvent (or otherwise constrain the streaming path) and throw a clear error if it is not.

Copilot uses AI. Check for mistakes.
}
}

// ---------------------------------------------------------------------------
// Suspend adapter: adapt(Call<R>) → Call<Flow<R>> (or Call<Flow<ServerSentEvent>> for SSE)
//
// Retrofit's SuspendForBody calls callAdapter.adapt(call) expecting a Call<ResponseT>, then
// calls KotlinExtensions.await() on it. We return a lightweight wrapper Call that, when
// enqueued, immediately delivers a cold Flow as the response body without starting the HTTP
// request yet. The actual HTTP request is deferred until the flow is collected.
// ---------------------------------------------------------------------------

private class SuspendFlowCallAdapter<R>(
private val _responseType: Type,
private val isStreaming: Boolean,
private val eventSourceFactory: EventSource.Factory?,
) : CallAdapter<R, Call<Flow<*>>> {

override fun responseType(): Type = _responseType

override fun adapt(call: Call<R>): Call<Flow<*>> {
val flow: Flow<*> =
if (isStreaming) {
streamingFlow(call.request(), requireNotNull(eventSourceFactory))
} else {
bodyFlow(call)
}
return FlowAsCall(call, flow)
}
}

/**
* A [Call] whose "response body" is a pre-built cold [Flow]. When enqueued it immediately
* delivers the flow to the callback so that Retrofit's suspend machinery can resume the coroutine
* with the flow value. The HTTP request is only started when the returned flow is collected.
*/
private class FlowAsCall<R>(
private val delegate: Call<R>,
private val flow: Flow<*>,
) : Call<Flow<*>> {

override fun enqueue(callback: Callback<Flow<*>>) {
callback.onResponse(this, Response.success(flow))
}

override fun execute(): Response<Flow<*>> = Response.success(flow)

override fun isExecuted(): Boolean = delegate.isExecuted

override fun cancel() = delegate.cancel()

override fun isCanceled(): Boolean = delegate.isCanceled

override fun clone(): Call<Flow<*>> = FlowAsCall(delegate.clone(), flow)

override fun request(): Request = delegate.request()

override fun timeout(): Timeout = delegate.timeout()
}

// ---------------------------------------------------------------------------
// Flow builders
// ---------------------------------------------------------------------------

/**
* Returns a cold [Flow] that, when collected, opens an OkHttp [EventSource] for the given
* [request] and emits each parsed [ServerSentEvent]. The connection is closed when the stream ends
* or the flow is cancelled.
*/
private fun streamingFlow(
request: Request,
eventSourceFactory: EventSource.Factory,
): Flow<ServerSentEvent> = callbackFlow {
val eventSource =
eventSourceFactory.newEventSource(
request,
object : EventSourceListener() {
override fun onEvent(
eventSource: EventSource,
id: String?,
type: String?,
data: String,
) {
trySend(ServerSentEvent(id = id, event = type, data = data))
}
Comment on lines +170 to +176

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onEvent uses trySend(...) but ignores the result. If the channel buffer is full or closed, events can be dropped silently, which breaks streaming semantics. Handle the ChannelResult (e.g., close with an error on buffer overflow) or switch to a sending approach that provides backpressure without dropping (such as sending from a launched coroutine).

Copilot uses AI. Check for mistakes.

override fun onClosed(eventSource: EventSource) {
close()
}

override fun onFailure(
eventSource: EventSource,
t: Throwable?,
response: okhttp3.Response?,
) {
close(
t
?: response?.let {
HttpException(Response.error<Nothing>(it.body, it))
}
)
}
},
)
awaitClose { eventSource.cancel() }
}

/**
* Returns a cold [Flow] that, when collected, makes the HTTP call, emits the single converted
* response body, and completes. Errors result in [HttpException] or [java.io.IOException].
*/
private fun <R> bodyFlow(call: Call<R>): Flow<R> = callbackFlow {
call.clone().enqueue(
object : Callback<R> {
override fun onResponse(call: Call<R>, response: Response<R>) {
if (!response.isSuccessful) {
close(HttpException(response))
return
}
val body = response.body()
if (body == null) {
close()
return
Comment on lines +216 to +230

Copilot AI Apr 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a successful HTTP response with a null body, Retrofit's existing suspend handling throws a KotlinNullPointerException with a message that includes the service + method name (via Invocation tag). This adapter currently throws a plain NullPointerException("Response body of a suspend fun was null"), which is less actionable and inconsistent with the rest of the codebase. Consider mirroring KotlinExtensions.await()'s null-body exception/message here.

Copilot uses AI. Check for mistakes.
}
trySend(body)
close()
}

override fun onFailure(call: Call<R>, t: Throwable) {
close(t)
}
}
)

awaitClose { call.cancel() }
}

Loading