Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
out/
build/
.idea/
kotlin-cqrs-eventsourcing.iml
kotlin-cqrs-eventsourcing.iml
.claude/settings.local.json
53 changes: 53 additions & 0 deletions PRD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
Kestrel currently supports semi-synchronous projections via BlockingAsyncEventProcessorWaiter. The way it works is that
when a command comes in via the CommandGateway, immediately after the events are sunk using the EventStore, the event store
hands off to a hook which passes the list of sunk events to an instance of BlockingAsyncEventProcessorWaiter.

This class just waits for any event-processors passed to it to be caught up to the sequence number of the new event(s).
It blocks the user request thread until they're up to date or eventually times out.

This has provided an ok quick solution to wanting the ability to have synchronous projectors, but it's a little bit slow
and it isn't great relying on a separate process. I'd prefer to have the ability for a list of local synchronous projectors
to handle the events directly in-thread rather than wait for a background thread.

Update the EventStore to accept two hooks, endOfSinkTransactionHook and the existing afterSinkHook. The new hook should
be called within the same transaction, and accepted a list of SequencedEvent.

Implement a BlockingSyncEventProcessorUpdater which takes a list of event processors, and updates them directly. These
event-processors may or may not be at head depending on if they're projections that have been caught up in the background
async, and then we're adding them to the sync list for the first time. Add the processing needed to catch them up to the
appropriate sequence number so that they can safely handle these new events. Take inspiration from BlockingAsyncEventProcessorWaiter
just for the part where it filters down based on event-type - not all event-processors need to handle all event types.

Granular plan:

## Phase 1: EventStore Hook Enhancement
- [x] Update `RelationalDatabaseEventStore` constructor to accept `endOfSinkTransactionHook` parameter
- [x] Add `endOfSinkTransactionHook` parameter to companion `create()` method with default empty implementation
- [x] **Test**: Update `RelationalDatabaseEventStoreTest` - verify backward compatibility with existing tests
- [x] Modify `sink()` method to call new hook WITHIN the database transaction (before commit)
- [x] **Test**: Verify `endOfSinkTransactionHook` called within transaction and before `afterSinkHook`
- [x] Add error handling for hook failures - wrap in try-catch to allow transaction rollback
- [x] **Test**: Verify transaction rollback when `endOfSinkTransactionHook` throws exception

## Phase 2: Event Filtering and Processor Structure
- [x] Create new `BlockingSyncEventProcessorUpdater` class accepting list of `BookmarkedEventProcessor<M>`
- [x] Implement event filtering logic using `domainEventClasses()` (pattern from `BlockingAsyncEventProcessorWaiter`)
- [x] **Test**: Create `BlockingSyncEventProcessorUpdaterTest` - verify event type filtering works correctly
- [x] **Test**: Test with multiple processors handling different event types

## Phase 3: Catch-up Mechanism
- [x] **Update `BlockingSyncEventProcessorUpdater`**: Add catch-up mechanism - check bookmark vs new events, fetch missed events with `EventSource.getAfter()`
- [x] **Test in `BlockingSyncEventProcessorUpdaterTest`**: Test catch-up scenarios when processors are behind current events
- [x] **Test in `BlockingSyncEventProcessorUpdaterTest`**: Test no-op behavior when processors are already caught up

## Phase 4: Synchronous Processing
- [ ] **Update `BlockingSyncEventProcessorUpdater`**: Implement synchronous event processing - call `processor.process()` for each relevant event
- [ ] **Update `BlockingSyncEventProcessorUpdater`**: Add bookmark updates - save new bookmark after processing each event
- [ ] **Test in `BlockingSyncEventProcessorUpdaterTest`**: Verify bookmark updates after successful processing
- [ ] **Update `BlockingSyncEventProcessorUpdater`**: Handle multiple processors with proper error handling (fail-fast approach)
- [ ] **Test in `BlockingSyncEventProcessorUpdaterTest`**: Test error handling - processor failure causes transaction rollback

## Phase 5: Integration and Final Verification
- [ ] **Test**: Create integration test showing sync and async processors working together
- [ ] **Test**: Test performance impact when no sync processors are configured
- [ ] **Test**: Run full test suite to ensure no regressions: `./gradlew test`
57 changes: 57 additions & 0 deletions progress.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
## Phase 1: EventStore Hook Enhancement Progress

### Task Completed (e881603): Add endOfSinkTransactionHook to RelationalDatabaseEventStore
- **PRD Items**: Phase 1 - Constructor update, create() method update, sink() modification, error handling
- **Key Decisions**:
- Added endOfSinkTransactionHook parameter as 8th parameter in constructor (before afterSinkHook)
- Positioned in create() method after eventsTableName param for logical grouping
- Called hook within transaction after all events inserted but before commit
- Added try-catch with re-throw to ensure transaction rollback on hook failures
- Maintained backward compatibility with default empty implementation { }
- **Files Changed**:
- RelationalDatabaseEventStore.kt: Constructor signature, create() method, sink() method
- **Status**: Implementation complete, compiles successfully, basic tests pass
- **Next Tasks**: Create BlockingSyncEventProcessorUpdater class (Phase 2)

### Task Completed (2dab094): Add tests for endOfSinkTransactionHook behavior
- **PRD Items**: Phase 1 - Test backward compatibility, transaction timing, rollback behavior
- **Key Decisions**:
- Verified all existing tests pass without modification (backward compatibility maintained)
- Added test confirming endOfSinkTransactionHook called within transaction before afterSinkHook
- Added test verifying transaction rollback when endOfSinkTransactionHook throws exception
- Used call order tracking and database queries within transaction to prove timing
- **Files Changed**:
- RelationalDatabaseEventStoreTest.kt: Added 2 comprehensive tests
- PRD.md: Marked Phase 1 tests as complete
- **Status**: Phase 1 complete, all tests pass, ready for Phase 2

### Task Completed (44c1ebd): Implement BlockingSyncEventProcessorUpdater with event filtering
- **PRD Items**: Phase 2 - Create BlockingSyncEventProcessorUpdater class, implement event filtering, create tests
- **Key Decisions**:
- Created basic class with processEvents() method accepting List<SequencedEvent<M>>
- Implemented event filtering using domainEventClasses().toSet() pattern from BlockingAsyncEventProcessorWaiter
- Each processor filters events to only those matching its domainEventClasses before processing
- Used forEach loops for processors and their relevant events (simple synchronous processing)
- Created comprehensive test suite covering: event type filtering, multiple processors same event type, empty cases
- Used SpecificMetadata and FooEvent/BarEvent/BazEvent from existing test patterns
- **Files Changed**:
- BlockingSyncEventProcessorUpdater.kt: New class with event filtering logic
- BlockingSyncEventProcessorUpdaterTest.kt: 4 comprehensive tests for filtering behavior
- PRD.md: Marked Phase 2 tasks as complete
- **Status**: Phase 2 complete, basic processing and filtering works, ready for Phase 3 (catch-up mechanism)

### Task Completed (a92de25): Add catch-up mechanism to BlockingSyncEventProcessorUpdater
- **PRD Items**: Phase 3 - Add catch-up mechanism, test catch-up scenarios, test no-op when caught up
- **Key Decisions**:
- Added EventSource<M> parameter to constructor for fetching missed events
- Implemented catch-up logic: check bookmark.sequence vs minNewEventSequence, fetch with eventSource.getAfter()
- Changed forEach to for loop to allow continue statements (Kotlin doesn't allow continue in forEach)
- Only fetch missed events when gap exists (bookmark.sequence < minNewEventSequence - 1)
- Filter fetched events to only those before new events (it.sequence < minNewEventSequence)
- Updated all existing tests to pass testEventSource() parameter
- Added 2 comprehensive catch-up tests: processor behind (gap exists) and processor caught up (no gap)
- **Files Changed**:
- BlockingSyncEventProcessorUpdater.kt: Added EventSource param, catch-up logic, changed forEach to for loop
- BlockingSyncEventProcessorUpdaterTest.kt: Updated existing tests, added 2 catch-up tests
- PRD.md: Marked Phase 3 tasks complete
- **Status**: Phase 3 complete, catch-up mechanism implemented and tested, ready for Phase 4 (synchronous processing)
18 changes: 18 additions & 0 deletions ralph-once.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

claude --permission-mode acceptEdits "@PRD.md @progress.txt \
1. Read the PRD and progress file. \
2. Find the next incomplete task and implement it. \
- tick boxes in the PRD task breakdown as you go \
- run ./gradlew build to prove everything compiles and tests pass, otherwise keep working til it passes \
3. Update progress.txt with what you did \
- treat this as a record for you to refer back to with zero prior context. \
After completing each task, append to progress.txt: \
- Task completed and PRD item reference \
- Key decisions made and reasoning \
- Files changed \
- Any blockers or notes for next iteration \
Keep entries concise. Sacrifice grammar for the sake of concision. This file helps future iterations skip exploration. \
4. Commit your changes including progress.txt and PRD.md. \
Before committing, run ./gradlew build to prove everything compiles and tests pass, otherwise don't commit
ONLY DO ONE TASK AT A TIME. "
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.cultureamp.eventsourcing

/**
* Synchronously processes events within a transaction using local processors.
* Unlike BlockingAsyncEventProcessorWaiter which waits for external async processors,
* this class directly executes the processing in the current thread.
*
* Includes catch-up mechanism to ensure processors are brought up to date before
* processing new events.
*/
class BlockingSyncEventProcessorUpdater<M : EventMetadata>(
private val eventProcessors: List<BookmarkedEventProcessor<M>>,
private val eventSource: EventSource<M>
) {

/**
* Processes the given events using the configured processors.
* First performs catch-up for any processors that are behind, then processes the new events.
* Only processes events that are relevant to each processor based on their domainEventClasses().
*/
fun processEvents(events: List<SequencedEvent<M>>) {
if (events.isEmpty()) return

for (processor in eventProcessors) {
val eventProcessorEventTypes = processor.eventProcessor.domainEventClasses().toSet()

// Get current bookmark for this processor
val currentBookmark = processor.bookmarkStore.bookmarkFor(processor.bookmarkName)

// Find the minimum sequence number from the new events that this processor cares about
val relevantNewEvents = events.filter { event ->
eventProcessorEventTypes.contains(event.event.domainEvent::class)
}

if (relevantNewEvents.isEmpty()) {
continue // No relevant events for this processor
}

val minNewEventSequence = relevantNewEvents.minOf { it.sequence }

// Check if we need to catch up - if there's a gap between bookmark and new events
if (currentBookmark.sequence < minNewEventSequence - 1) {
// Fetch missed events from bookmark sequence to just before the new events
val missedEvents = eventSource.getAfter(
sequence = currentBookmark.sequence,
eventClasses = eventProcessorEventTypes.toList()
).filter { it.sequence < minNewEventSequence }

// Process missed events first
missedEvents.forEach { event ->
processor.eventProcessor.process(event.event, event.sequence)
}
}

// Process the new relevant events
relevantNewEvents.forEach { event ->
processor.eventProcessor.process(event.event, event.sequence)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class RelationalDatabaseEventStore<M : EventMetadata>(
private val objectMapper: ObjectMapper,
private val eventTypeResolver: EventTypeResolver,
private val blockingLockUntilTransactionEnd: Transaction.() -> CommandError? = { null },
private val endOfSinkTransactionHook: (List<SequencedEvent<M>>) -> Unit = { },
private val afterSinkHook: (List<SequencedEvent<M>>) -> Unit = { },
private val eventsSinkTable: Events = events,
) : EventStore<M> {
Expand All @@ -52,6 +53,7 @@ class RelationalDatabaseEventStore<M : EventMetadata>(
db: Database,
objectMapper: ObjectMapper = defaultObjectMapper,
eventsTableName: String = defaultEventsTableName,
noinline endOfSinkTransactionHook: (List<SequencedEvent<M>>) -> Unit = { },
noinline afterSinkHook: (List<SequencedEvent<M>>) -> Unit = { },
eventTypeResolver: EventTypeResolver = defaultEventTypeResolver,
eventsSequenceStats: EventsSequenceStats? = RelationalDatabaseEventsSequenceStats(db, eventTypeResolver, defaultEventsSequenceStatsTableName).also { it.createSchemaIfNotExists() },
Expand All @@ -71,7 +73,7 @@ class RelationalDatabaseEventStore<M : EventMetadata>(
}
},
): RelationalDatabaseEventStore<M> {
return RelationalDatabaseEventStore(db, Events(eventsTableName, jsonBody), eventsSequenceStats, M::class.java, objectMapper, eventTypeResolver, blockingLockUntilTransactionEnd, afterSinkHook)
return RelationalDatabaseEventStore(db, Events(eventsTableName, jsonBody), eventsSequenceStats, M::class.java, objectMapper, eventTypeResolver, blockingLockUntilTransactionEnd, endOfSinkTransactionHook, afterSinkHook)
}
}

Expand Down Expand Up @@ -109,7 +111,10 @@ class RelationalDatabaseEventStore<M : EventMetadata>(
eventsSequenceStats?.save(event.domainEvent::class, insertedSequence)
SequencedEvent(event, insertedSequence)
}
}.let { Right(it) }
}.let { sequencedEvents ->
endOfSinkTransactionHook(sequencedEvents)
Right(sequencedEvents)
}
}
}
} catch (e: ExposedSQLException) {
Expand Down
Loading