Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,14 +590,22 @@ impl Coordinator {
|| "group_commit_apply",
async move {
// Wait for the writes to complete.
match append_fut
let write_result = match append_fut
.instrument(debug_span!("group_commit_apply::append_fut"))
.await
{
Ok(append_result) => {
append_result.unwrap_or_terminate("cannot fail to apply appends")
append_result.unwrap_or_terminate("cannot fail to apply appends");
Ok(())
}
// The writer's oneshot channel was dropped before it sent a
// result, so the writes are in an indefinite state. We must
// not report success to clients, since their writes may not
// have been durably persisted.
Err(_) => {
warn!("Writer terminated with writes in indefinite state");
Err(())
}
Err(_) => warn!("Writer terminated with writes in indefinite state"),
};

// Apply the write by marking the timestamp as complete on the timeline.
Expand All @@ -609,6 +617,17 @@ impl Coordinator {
for response in responses {
let (mut ctx, result) = response.finalize();
ctx.session_mut().apply_write(timestamp);
// If the writer terminated with the writes in an indefinite
// state, report an error rather than telling the client a
// success that we cannot guarantee.
let result = match write_result {
Ok(()) => result,
Err(()) => Err(AdapterError::Internal(
"table write failed: writer terminated with writes in \
an indefinite state"
.to_string(),
)),
};
ctx.retire(result);
}

Expand Down
Loading