Skip to content

Bug: createStreamingAzureResponse buffers all deltas and emits only at completion (no true streaming) #48726

@jega-ms

Description

@jega-ms

Artifact

com.azure:azure-ai-agents:2.0.0
com.azure:azure-ai-agents:2.0.0-beta3

Description

When using responsesAsyncClient.createStreamingAzureResponse, the API does not stream incremental response events as expected.
Instead of emitting tokens/deltas progressively, all response events are buffered and delivered only after the request completes, making it behave like a non-streaming call.

Expected Behavior

  • Flux should emit events in real-time
  • onNext() should be triggered continuously as tokens/deltas are generated
  • Suitable for reactive streaming (e.g., WebFlux / SSE)

Actual Behavior

  • No intermediate events are received during processing
  • All ResponseStreamEvent items are emitted together at the end
  • Observed behavior is effectively batched response, not streaming

Reproduction Code

responsesAsyncClient.createStreamingAzureResponse(request)
    .doOnNext(event -> {
        System.out.println("Received event: " + event);
    })
    .doOnComplete(() -> {
        System.out.println("Stream completed");
    })
    .blockLast();

Sample Test Code

✅ Kotlin Streaming Test (Env-based Configuration)

import com.azure.ai.agents.AgentsClientBuilder
import com.azure.ai.agents.models.AgentReference
import com.azure.ai.agents.models.AzureCreateResponseOptions
import com.azure.core.credential.TokenCredential
import com.azure.identity.ClientSecretCredentialBuilder
import com.openai.models.ChatModel
import com.openai.models.responses.ResponseCreateParams
import com.openai.models.responses.ResponseStreamEvent
import reactor.core.publisher.Flux

fun main() {

    // 🔐 Read from ENV
    val endpoint = getEnv("AZURE_AI_ENDPOINT")
    val tenantId = getEnv("AZURE_TENANT_ID")
    val clientId = getEnv("AZURE_CLIENT_ID")
    val clientSecret = getEnv("AZURE_CLIENT_SECRET")
    val agentId = getEnv("AZURE_AGENT_ID")
    val model = System.getenv("AZURE_MODEL") ?: "gpt-4o"
    val conversationId = System.getenv("AZURE_CONVERSATION_ID") // optional

    // 🔑 Credential
    val credential: TokenCredential = ClientSecretCredentialBuilder()
        .tenantId(tenantId)
        .clientId(clientId)
        .clientSecret(clientSecret)
        .build()

    // 🤖 Client
    val client = AgentsClientBuilder()
        .endpoint(endpoint)
        .credential(credential)
        .buildResponsesAsyncClient()

    // 🧾 Request
    val requestBuilder = ResponseCreateParams.builder()
        .input("Write a detailed explanation about artificial intelligence in multiple paragraphs.")
        .model(ChatModel.of(model))

    if (!conversationId.isNullOrBlank()) {
        requestBuilder.conversation(conversationId)
    }

    val request = requestBuilder.build()

    val agentReference = AgentReference(agentId)
    val options = AzureCreateResponseOptions()
        .setAgentReference(agentReference)

    val startTime = System.currentTimeMillis()

    println("🚀 Starting streaming request...\n")

    val stream: Flux<ResponseStreamEvent> =
        client.createStreamingAzureResponse(options, request)

    stream
        .doOnNext { event ->
            val now = System.currentTimeMillis()

            // 🔍 Print event type
            println("[${now - startTime} ms] Event: ${event.javaClass.simpleName}")

            // (Optional) Try to print delta if available
            try {
                val deltaField = event.javaClass.getDeclaredField("delta")
                deltaField.isAccessible = true
                val deltaValue = deltaField.get(event)
                if (deltaValue != null) {
                    println("   ↳ Δ: $deltaValue")
                }
            } catch (_: Exception) {
                // ignore if not present
            }
        }
        .doOnComplete {
            val total = System.currentTimeMillis() - startTime
            println("\n✅ Stream completed in ${total} ms")
        }
        .doOnError { error ->
            println("\n❌ Error: ${error.message}")
            error.printStackTrace()
        }
        .blockLast()
}

/**
 * Helper to ensure required ENV variables exist
 */
fun getEnv(name: String): String {
    return System.getenv(name)
        ?: throw IllegalArgumentException("❌ Missing required environment variable: $name")
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    AI Projectscustomer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-triageWorkflow: This is a new issue that needs to be triaged to the appropriate team.questionThe issue doesn't require a change to the product in order to be resolved. Most issues start as that

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions