Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 66 additions & 20 deletions core/mvcc/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1278,35 +1278,63 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
log_record.header = Some(*tx.header.read());
}

for id in &self.write_set {
// Process schema rows (sqlite_schema) before data rows so that during log
// replay the table_id_to_rootpage map is populated before data row inserts
// reference it. The SkipSet iteration order sorts by table_id (most negative
// first), which would otherwise place data table rows (e.g. table_id=-3)
// before schema rows (table_id=-1).

// Remap a table_id to its canonical form for the log. After checkpoint,
// a table's in-memory table_id (e.g. -53) may differ from -(root_page)
// (e.g. -58). On recovery, bootstrap reconstructs the map using
// -(root_page), so log records must use that canonical form to be found.
let canonicalize_table_id = |version: &mut RowVersion| {
let table_id = version.row.id.table_id;
if table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
return;
}
if let Some(entry) = mvcc_store.table_id_to_rootpage.get(&table_id) {
if let Some(root_page) = *entry.value() {
let canonical = MVTableId::from(-(root_page as i64));
if canonical != table_id {
version.row.id.table_id = canonical;
}
}
}
};

let collect_versions = |id: &RowID,
log_record: &mut LogRecord,
did_commit_schema: &mut bool| {
if let Some(row_versions) = mvcc_store.rows.get(id) {
let row_versions = row_versions.value().read();
for row_version in row_versions.iter() {
let mut committed_version = row_version.clone();
let mut changed = false;
if let Some(TxTimestampOrID::TxID(id)) = committed_version.begin {
if id == self.tx_id {
if let Some(TxTimestampOrID::TxID(vid)) = committed_version.begin {
if vid == self.tx_id {
// New version is valid STARTING FROM the committing
// transaction's end timestamp. See Hekaton page 299.
committed_version.begin = Some(TxTimestampOrID::Timestamp(end_ts));
changed = true;
if committed_version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
self.did_commit_schema_change = true;
*did_commit_schema = true;
}
}
}
if let Some(TxTimestampOrID::TxID(id)) = committed_version.end {
if id == self.tx_id {
if let Some(TxTimestampOrID::TxID(vid)) = committed_version.end {
if vid == self.tx_id {
// Old version is valid UNTIL the committing
// transaction's end timestamp. See Hekaton page 299.
committed_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
changed = true;
if committed_version.row.id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
self.did_commit_schema_change = true;
*did_commit_schema = true;
}
}
}
if changed {
canonicalize_table_id(&mut committed_version);
mvcc_store
.insert_version_raw(&mut log_record.row_versions, committed_version);
}
Expand All @@ -1323,23 +1351,24 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
for row_version in row_versions.iter() {
let mut committed_version = row_version.clone();
let mut changed = false;
if let Some(TxTimestampOrID::TxID(id)) = committed_version.begin {
if id == self.tx_id {
if let Some(TxTimestampOrID::TxID(vid)) = committed_version.begin {
if vid == self.tx_id {
// New version is valid STARTING FROM the committing
// transaction's end timestamp. See Hekaton page 299.
committed_version.begin = Some(TxTimestampOrID::Timestamp(end_ts));
changed = true;
}
}
if let Some(TxTimestampOrID::TxID(id)) = committed_version.end {
if id == self.tx_id {
if let Some(TxTimestampOrID::TxID(vid)) = committed_version.end {
if vid == self.tx_id {
// Old version is valid UNTIL the committing
// transaction's end timestamp. See Hekaton page 299.
committed_version.end = Some(TxTimestampOrID::Timestamp(end_ts));
changed = true;
}
}
if changed {
canonicalize_table_id(&mut committed_version);
mvcc_store.insert_version_raw(
&mut log_record.row_versions,
committed_version,
Expand All @@ -1348,6 +1377,19 @@ impl<Clock: LogicalClock> CommitStateMachine<Clock> {
}
}
}
};

// First pass: schema rows only
for id in &self.write_set {
if id.table_id == SQLITE_SCHEMA_MVCC_TABLE_ID {
collect_versions(id, &mut log_record, &mut self.did_commit_schema_change);
}
}
// Second pass: all non-schema rows
for id in &self.write_set {
if id.table_id != SQLITE_SCHEMA_MVCC_TABLE_ID {
collect_versions(id, &mut log_record, &mut self.did_commit_schema_change);
}
}

log_record
Expand Down Expand Up @@ -4670,12 +4712,14 @@ impl<Clock: LogicalClock> MvStore<Clock> {
let rowid_int = rowid.row_id.to_int_or_panic();
schema_rows.insert(rowid_int, record);
needs_schema_rebuild.set(true);
} else {
turso_assert!(self.table_id_to_rootpage.get(&rowid.table_id).is_some(),
"Logical log contains a row version insert with a table id that does not exist in the table_id_to_rootpage map",
{"table_id": rowid.table_id,
"table_id_to_rootpage_map": format!("{:?}", self.table_id_to_rootpage.iter().collect::<Vec<_>>())
});
} else if self.table_id_to_rootpage.get(&rowid.table_id).is_none() {
// Data row references a table_id not yet in the map. This can happen
// with logs written before the schema-first serialization fix: in a
// same-transaction CREATE TABLE + INSERT + DROP TABLE, data rows were
// serialized before the schema INSERT that registers the table_id.
// The schema INSERT (or DELETE) for this table will follow later in
// this transaction frame, so we register the table_id now.
self.insert_table_id_to_rootpage(rowid.table_id, None);
}

let version_id = self.get_version_id();
Expand Down Expand Up @@ -4705,9 +4749,11 @@ impl<Clock: LogicalClock> MvStore<Clock> {
if commit_ts <= replay_cutoff_ts {
continue;
}
turso_assert!(self.table_id_to_rootpage.get(&rowid.table_id).is_some(),
"Logical log contains a row version delete with a table id that does not exist in the table_id_to_rootpage map",
{"rootpage_map": rowid.table_id});
if self.table_id_to_rootpage.get(&rowid.table_id).is_none() {
// See comment in UpsertTableRow: old logs may have data rows
// serialized before the schema INSERT that registers the table_id.
self.insert_table_id_to_rootpage(rowid.table_id, None);
}
if let Some(versions) = self.rows.get(&rowid) {
// Row exists in memory — try to find the current (non-ended) version
// that was committed before this delete, and mark it as ended. If no
Expand Down
194 changes: 194 additions & 0 deletions core/mvcc/database/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8499,3 +8499,197 @@ fn test_encrypted_recovery_corrupted_ciphertext() {
assert_eq!(rows[0][0].as_int().unwrap(), 1);
assert_eq!(rows[0][1].to_string(), "survives");
}

/// Reproducer for a bug where log replay after checkpoint-restart-checkpoint-restart
/// panics with "table id that does not exist in the table_id_to_rootpage map".
///
/// The scenario from the simulator:
/// 1. Create many tables, insert data, checkpoint (tables get positive root pages)
/// 2. Restart (recovery rebuilds table_id_to_rootpage from btree schema)
/// 3. Create more tables + insert into old and new tables
/// 4. Checkpoint (all tables now have positive root pages, log is truncated)
/// 5. Insert more data into all tables (un-checkpointed, written to log with
/// table IDs assigned in this server incarnation)
/// 6. Restart → bootstrap rebuilds map from btree root pages, then log replay
/// sees row inserts for table IDs that may not match the bootstrap mapping
#[test]
fn test_recovery_many_tables_checkpoint_restart_checkpoint_restart() {
let mut db = MvccTestDbNoConn::new_with_random_db();
let num_initial_tables = 50;
let num_extra_tables = 30;

// Step 1: Create many tables, insert data, checkpoint
{
let conn = db.connect();
for i in 0..num_initial_tables {
conn.execute(&format!(
"CREATE TABLE t{i}(id INTEGER PRIMARY KEY, v TEXT)"
))
.unwrap();
conn.execute(&format!("INSERT INTO t{i} VALUES (1, 'init')"))
.unwrap();
}
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
conn.close().unwrap();
}

// Step 2: Restart (simulates server redeploy)
db.restart();

// Step 3: Create more tables + insert into old tables, then checkpoint
{
let conn = db.connect();
// Create new tables (these get new negative table IDs)
for i in 0..num_extra_tables {
conn.execute(&format!(
"CREATE TABLE extra{i}(id INTEGER PRIMARY KEY, v TEXT)"
))
.unwrap();
conn.execute(&format!("INSERT INTO extra{i} VALUES (1, 'extra')"))
.unwrap();
}
// Insert into the original tables
for i in 0..num_initial_tables {
conn.execute(&format!("INSERT INTO t{i} VALUES (2, 'after_restart')"))
.unwrap();
}
// Step 4: Checkpoint - all tables get positive root pages, log truncated
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)").unwrap();

// Step 5: More writes after checkpoint (un-checkpointed, in the log)
for i in 0..num_initial_tables {
conn.execute(&format!("INSERT INTO t{i} VALUES (3, 'post_ckpt2')"))
.unwrap();
}
for i in 0..num_extra_tables {
conn.execute(&format!(
"INSERT INTO extra{i} VALUES (2, 'extra_post_ckpt')"
))
.unwrap();
}
conn.close().unwrap();
}

// Step 6: Restart again - log replay should not panic
db.restart();

// Verify data integrity
{
let conn = db.connect();
for i in 0..num_initial_tables {
let rows = get_rows(&conn, &format!("SELECT id, v FROM t{i} ORDER BY id"));
assert_eq!(
rows.len(),
3,
"table t{i} should have 3 rows, got {}",
rows.len()
);
}
for i in 0..num_extra_tables {
let rows = get_rows(&conn, &format!("SELECT id, v FROM extra{i} ORDER BY id"));
assert_eq!(
rows.len(),
2,
"table extra{i} should have 2 rows, got {}",
rows.len()
);
}
}
}

/// Variant that does 3 restart cycles with tables created across each incarnation.
/// This stresses the table_id_to_rootpage mapping more aggressively.
#[test]
fn test_recovery_three_restarts_with_table_creation() {
let mut db = MvccTestDbNoConn::new_with_random_db();

// Incarnation 1: create tables, checkpoint
{
let conn = db.connect();
for i in 0..20 {
conn.execute(&format!(
"CREATE TABLE a{i}(id INTEGER PRIMARY KEY, v TEXT)"
))
.unwrap();
conn.execute(&format!("INSERT INTO a{i} VALUES (1, 'a')"))
.unwrap();
}
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
conn.close().unwrap();
}

db.restart();

// Incarnation 2: create more tables, insert into old, checkpoint, then more writes
{
let conn = db.connect();
for i in 0..20 {
conn.execute(&format!(
"CREATE TABLE b{i}(id INTEGER PRIMARY KEY, v TEXT)"
))
.unwrap();
conn.execute(&format!("INSERT INTO b{i} VALUES (1, 'b')"))
.unwrap();
}
for i in 0..20 {
conn.execute(&format!("INSERT INTO a{i} VALUES (2, 'a2')"))
.unwrap();
}
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
// Un-checkpointed writes
for i in 0..20 {
conn.execute(&format!("INSERT INTO a{i} VALUES (3, 'a3')"))
.unwrap();
conn.execute(&format!("INSERT INTO b{i} VALUES (2, 'b2')"))
.unwrap();
}
conn.close().unwrap();
}

db.restart();

// Incarnation 3: create even more tables, insert everywhere, checkpoint, more writes
{
let conn = db.connect();
for i in 0..20 {
conn.execute(&format!(
"CREATE TABLE c{i}(id INTEGER PRIMARY KEY, v TEXT)"
))
.unwrap();
conn.execute(&format!("INSERT INTO c{i} VALUES (1, 'c')"))
.unwrap();
}
for i in 0..20 {
conn.execute(&format!("INSERT INTO a{i} VALUES (4, 'a4')"))
.unwrap();
conn.execute(&format!("INSERT INTO b{i} VALUES (3, 'b3')"))
.unwrap();
}
conn.execute("PRAGMA wal_checkpoint(TRUNCATE)").unwrap();
// Un-checkpointed writes to all tables
for i in 0..20 {
conn.execute(&format!("INSERT INTO a{i} VALUES (5, 'a5')"))
.unwrap();
conn.execute(&format!("INSERT INTO b{i} VALUES (4, 'b4')"))
.unwrap();
conn.execute(&format!("INSERT INTO c{i} VALUES (2, 'c2')"))
.unwrap();
}
conn.close().unwrap();
}

// Final restart - should not panic during log replay
db.restart();

{
let conn = db.connect();
for i in 0..20 {
let rows = get_rows(&conn, &format!("SELECT id FROM a{i} ORDER BY id"));
assert_eq!(rows.len(), 5, "table a{i} should have 5 rows");
let rows = get_rows(&conn, &format!("SELECT id FROM b{i} ORDER BY id"));
assert_eq!(rows.len(), 4, "table b{i} should have 4 rows");
let rows = get_rows(&conn, &format!("SELECT id FROM c{i} ORDER BY id"));
assert_eq!(rows.len(), 2, "table c{i} should have 2 rows");
}
}
}
Loading
Loading