Conversation
- SseStreamingGatewayFilter 추가: text/event-stream 응답을 writeAndFlushWith로 교체해 게이트웨이에서 즉시 flush - OrderSseRegistry: SseEmitter → Sinks.Many 기반 Flux<ServerSentEvent> 전환 - OrderController.streamOrder: Flux<ServerSentEvent<OrderResponse>> 리턴으로 변경, SSE 이벤트에 status name 포함 - order-api에 spring-boot-starter-webflux 추가 및 web-application-type: servlet 명시
- MemberRegisteredPointInitializer: WalletAlreadyInitializedException 발생 시 markProcessed()가 실행되지 않아 Kafka 무한 재시도 발생하던 문제 수정 - OrderService: 미사용 레거시 클래스 삭제 (CreateOrderUseCase가 동일 역할 수행)
📝 WalkthroughWalkthroughThe pull request migrates Server-Sent Events (SSE) handling from Spring MVC's blocking Changes
Sequence DiagramsequenceDiagram
participant Client
participant Gateway as API Gateway<br/>(SseStreamingGatewayFilter)
participant Controller as OrderController<br/>(streamOrder)
participant Registry as OrderSseRegistry<br/>(Sinks.Many)
participant EventBus as Event Bus<br/>(notify)
Client->>Gateway: GET /orders/{id}/stream
activate Gateway
Gateway->>Controller: Forward request
activate Controller
Controller->>Registry: register(orderId)
activate Registry
Registry->>Registry: Create Sinks.Many<ServerSentEvent>
Registry-->>Controller: Return Flux<ServerSentEvent>
deactivate Registry
Controller->>Controller: Build initial event w/ current status
Controller-->>Gateway: Return Flux (initial + stream)
deactivate Controller
Gateway->>Gateway: Detect text/event-stream Content-Type
Gateway->>Gateway: Decorate response with SseFlushingResponse
Gateway-->>Client: Start streaming response
deactivate Gateway
EventBus->>Registry: notify(orderId, newStatus)
activate Registry
Registry->>Registry: sink.tryEmitNext(ServerSentEvent)
deactivate Registry
Registry-->>Client: Stream event via flushed chunks
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
domains/payment/payment-infrastructure/src/main/kotlin/devcoop/occount/payment/infrastructure/event/MemberRegisteredPointInitializer.kt (1)
23-44:⚠️ Potential issue | 🟠 MajorRemove
@Transactionalfrom the Kafka listener method.The
@Transactionalannotation at line 23 wraps both the wallet initialization and the consumed-event marker in the same transaction. WheninitializeWalletUseCase.initialize()encounters a duplicate wallet (and raisesWalletAlreadyInitializedException), the underlyingDuplicateKeyExceptionfrom the repository marks the transaction as rollback-only. Even though the exception is caught, the entire transaction—including the subsequentmarkProcessed()call—will be rolled back. This causes Kafka to retry indefinitely because the consumed-event marker is never persisted.This pattern also violates the transaction guidelines: transactions should only wrap operations that require atomic all-or-nothing semantics (2+ writes that must together succeed or fail). Wallet initialization and event processing are separate concerns. Additionally,
@Transactionalshould not wrap Kafka listener boundaries, as they include messaging I/O.The codebase already follows the correct pattern:
OrderPaymentEventListenerlistener methods are not annotated with@Transactional. Apply the same approach here: remove the annotation and let each operation manage its own transaction scope if needed (wallet initialization via the use case, event marking as a separate JPA save).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@domains/payment/payment-infrastructure/src/main/kotlin/devcoop/occount/payment/infrastructure/event/MemberRegisteredPointInitializer.kt` around lines 23 - 44, Remove the `@Transactional` annotation from the Kafka listener method onMemberRegistered in MemberRegisteredPointInitializer so the wallet initialization and the consumed-event marker are not wrapped in a single transaction; keep the try/catch for WalletAlreadyInitializedException around initializeWalletUseCase.initialize(event.userId) and ensure markProcessed(consumerName, eventId) remains called afterwards, allowing initializeWalletUseCase and the repository save performed by markProcessed to manage their own transactions independently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@domains/order/order-api/src/main/kotlin/devcoop/occount/order/api/order/OrderController.kt`:
- Around line 55-62: Register the SSE sink before reading the snapshot to avoid
lost interim updates: call orderSseRegistry.register(orderId) first (keeping the
returned Flux/Sink token), then call getOrderUseCase.getOrder(orderId) to build
the initial ServerSentEvent via ServerSentEvent.builder(current). If the
snapshot shows a final status (current.status.isFinalForClient()), immediately
close/unregister the previously created token/sink (use the register token or
returned disposable) and return a single Flux.just(initialEvent); otherwise
concat Flux.just(initialEvent) with the registered stream from
orderSseRegistry.register(orderId).
In
`@domains/order/order-api/src/main/kotlin/devcoop/occount/order/api/sse/OrderSseRegistry.kt`:
- Around line 17-32: The register/notify logic can unregister a newer sink when
a reconnect replaces sinks[orderId] and then an old sink's doFinally or
final-status path removes it; fix by making register check for an existing sink
and, if present, complete the previous sink before replacing it, store the new
sink in sinks[orderId] and capture the stored instance id to use in doFinally
and notify so removals are conditional (only remove if the map still points to
the same sink instance), and in notify check the Sinks.EmitResult from
tryEmitNext and tryEmitComplete and handle non-success results (log or retry as
appropriate) rather than ignoring them; refer to register, notify, sinks,
doFinally, tryEmitNext, tryEmitComplete, OrderResponse, and isFinalForClient
when locating the changes.
---
Outside diff comments:
In
`@domains/payment/payment-infrastructure/src/main/kotlin/devcoop/occount/payment/infrastructure/event/MemberRegisteredPointInitializer.kt`:
- Around line 23-44: Remove the `@Transactional` annotation from the Kafka
listener method onMemberRegistered in MemberRegisteredPointInitializer so the
wallet initialization and the consumed-event marker are not wrapped in a single
transaction; keep the try/catch for WalletAlreadyInitializedException around
initializeWalletUseCase.initialize(event.userId) and ensure
markProcessed(consumerName, eventId) remains called afterwards, allowing
initializeWalletUseCase and the repository save performed by markProcessed to
manage their own transactions independently.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7f1809da-cd63-438c-936c-7e8a0ea5270b
📒 Files selected for processing (8)
domains/order/order-api/build.gradledomains/order/order-api/src/main/kotlin/devcoop/occount/order/api/order/OrderController.ktdomains/order/order-api/src/main/kotlin/devcoop/occount/order/api/sse/OrderSseRegistry.ktdomains/order/order-api/src/main/resources/application.yamldomains/order/order-api/src/test/kotlin/devcoop/occount/order/api/order/OrderControllerTest.ktdomains/order/order-application/src/main/kotlin/devcoop/occount/order/application/order/OrderService.ktdomains/payment/payment-infrastructure/src/main/kotlin/devcoop/occount/payment/infrastructure/event/MemberRegisteredPointInitializer.ktgateway/api-gateway/src/main/kotlin/devcoop/occount/gateway/api/presentation/SseStreamingGatewayFilter.kt
💤 Files with no reviewable changes (2)
- domains/order/order-api/src/test/kotlin/devcoop/occount/order/api/order/OrderControllerTest.kt
- domains/order/order-application/src/main/kotlin/devcoop/occount/order/application/order/OrderService.kt
| val current = getOrderUseCase.getOrder(orderId) | ||
| val timeoutMs = timeoutSeconds * 1000L + asyncTimeoutBufferMillis | ||
| val emitter = orderSseRegistry.register(orderId, timeoutMs) | ||
| emitter.send(SseEmitter.event().data(current)) | ||
| val initialEvent = ServerSentEvent.builder(current) | ||
| .event(current.status.name) | ||
| .build() | ||
| if (current.status.isFinalForClient()) { | ||
| emitter.complete() | ||
| return Flux.just(initialEvent) | ||
| } | ||
| return emitter | ||
| return Flux.concat(Flux.just(initialEvent), orderSseRegistry.register(orderId)) |
There was a problem hiding this comment.
Register before reading the snapshot to avoid lost status updates.
getOrderUseCase.getOrder(orderId) runs before orderSseRegistry.register(orderId). If the order changes between those calls, OrderSseRegistry.notify() has no sink and drops the event, leaving the client stuck on the stale initial status. Register first, then read the snapshot, and add a token-aware unregister/close path for the already-final case.
🛠️ Safer sequencing sketch
- val current = getOrderUseCase.getOrder(orderId)
+ val updates = orderSseRegistry.register(orderId)
+ val current = getOrderUseCase.getOrder(orderId)
val initialEvent = ServerSentEvent.builder(current)
.event(current.status.name)
.build()
if (current.status.isFinalForClient()) {
+ // Add a token-aware close/unregister API so this does not remove a newer stream.
+ orderSseRegistry.unregister(orderId)
return Flux.just(initialEvent)
}
- return Flux.concat(Flux.just(initialEvent), orderSseRegistry.register(orderId))
+ return Flux.concat(Flux.just(initialEvent), updates)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@domains/order/order-api/src/main/kotlin/devcoop/occount/order/api/order/OrderController.kt`
around lines 55 - 62, Register the SSE sink before reading the snapshot to avoid
lost interim updates: call orderSseRegistry.register(orderId) first (keeping the
returned Flux/Sink token), then call getOrderUseCase.getOrder(orderId) to build
the initial ServerSentEvent via ServerSentEvent.builder(current). If the
snapshot shows a final status (current.status.isFinalForClient()), immediately
close/unregister the previously created token/sink (use the register token or
returned disposable) and return a single Flux.just(initialEvent); otherwise
concat Flux.just(initialEvent) with the registered stream from
orderSseRegistry.register(orderId).
| fun register(orderId: String): Flux<ServerSentEvent<OrderResponse>> { | ||
| val sink = Sinks.many().unicast().onBackpressureBuffer<ServerSentEvent<OrderResponse>>() | ||
| sinks[orderId] = sink | ||
| return sink.asFlux().doFinally { sinks.remove(orderId) } | ||
| } | ||
|
|
||
| override fun notify(orderId: String, status: OrderStatus, failureReason: String?) { | ||
| val emitter = emitters[orderId] ?: return | ||
| try { | ||
| emitter.send(SseEmitter.event().data(OrderResponse(orderId, status, failureReason))) | ||
| if (status.isFinalForClient()) { | ||
| emitter.complete() | ||
| emitters.remove(orderId) | ||
| } | ||
| } catch (_: Exception) { | ||
| emitters.remove(orderId) | ||
| val sink = sinks[orderId] ?: return | ||
| sink.tryEmitNext( | ||
| ServerSentEvent.builder(OrderResponse(orderId, status, failureReason)) | ||
| .event(status.name) | ||
| .build(), | ||
| ) | ||
| if (status.isFinalForClient()) { | ||
| sink.tryEmitComplete() | ||
| sinks.remove(orderId) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Inspect Reactor sink emission and map cleanup sites.
rg -n -C4 --type-add 'kotlin:*.kt' --type kotlin \
'tryEmit(Next|Complete)|sinks\.remove\(|ConcurrentHashMap<String, Sinks\.Many'Repository: DevCoopBSM/occount-back-msa
Length of output: 3311
Guard sink ownership before removing or replacing it.
A reconnect can overwrite sinks[orderId], then the old stream's doFinally { sinks.remove(orderId) } or final-status cleanup can remove the new active sink. Complete the previous sink intentionally and use conditional remove so stale streams cannot unregister newer ones. Also handle failed tryEmitNext and tryEmitComplete results.
🛠️ Proposed ownership-safe cleanup
fun register(orderId: String): Flux<ServerSentEvent<OrderResponse>> {
val sink = Sinks.many().unicast().onBackpressureBuffer<ServerSentEvent<OrderResponse>>()
- sinks[orderId] = sink
- return sink.asFlux().doFinally { sinks.remove(orderId) }
+ sinks.compute(orderId) { _, previous ->
+ previous?.tryEmitComplete()
+ sink
+ }
+ return sink.asFlux().doFinally { sinks.remove(orderId, sink) }
}
override fun notify(orderId: String, status: OrderStatus, failureReason: String?) {
val sink = sinks[orderId] ?: return
- sink.tryEmitNext(
+ val emitResult = sink.tryEmitNext(
ServerSentEvent.builder(OrderResponse(orderId, status, failureReason))
.event(status.name)
.build(),
)
+ if (emitResult.isFailure) {
+ sinks.remove(orderId, sink)
+ return
+ }
if (status.isFinalForClient()) {
sink.tryEmitComplete()
- sinks.remove(orderId)
+ sinks.remove(orderId, sink)
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@domains/order/order-api/src/main/kotlin/devcoop/occount/order/api/sse/OrderSseRegistry.kt`
around lines 17 - 32, The register/notify logic can unregister a newer sink when
a reconnect replaces sinks[orderId] and then an old sink's doFinally or
final-status path removes it; fix by making register check for an existing sink
and, if present, complete the previous sink before replacing it, store the new
sink in sinks[orderId] and capture the stored instance id to use in doFinally
and notify so removals are conditional (only remove if the map still points to
the same sink instance), and in notify check the Sinks.EmitResult from
tryEmitNext and tryEmitComplete and handle non-success results (log or retry as
appropriate) rather than ignoring them; refer to register, notify, sinks,
doFinally, tryEmitNext, tryEmitComplete, OrderResponse, and isFinalForClient
when locating the changes.
SSE 게이트웨이 프록시 문제 및 Kafka 무한 재시도 문제 수정
Summary by CodeRabbit
New Features
Bug Fixes