Skip to content

Commit a18c8ff

Browse files
badrishcCopilotvazois
authored
Fix AOF persistence and WATCH for collection-emptying RMW ops (#1678)
Bug 1: Collection-emptying RMW operations (LPOP, ZREM, HDEL, SREM that empty a collection) were not logged to AOF. InPlaceUpdaterWorker and PostCopyUpdater returned false before setting NeedAofLog when HasRemoveKey was true, so PostRMWOperation never wrote the AOF entry. Data reappeared after restart. Bug 2: InPlaceUpdaterWorker's HasRemoveKey path did not call IncrementVersion on the watch version map, so WATCH/MULTI/EXEC transactions did not detect the key modification and incorrectly committed. Bug 3: AofProcessor.RecoverReplay disposed respServerSession in its finally block, but in multi-DB recovery it ran once per database, causing double-dispose and NullReferenceException in GarnetLatencyMetricsSession.Return(). Moved dispose to AofProcessor.Dispose() which runs exactly once. Fixes #1675 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Vasileios Zois <96085550+vazois@users.noreply.github.com>
1 parent a8fe1c6 commit a18c8ff

File tree

5 files changed

+279
-11
lines changed

5 files changed

+279
-11
lines changed

libs/server/AOF/AofProcessor.cs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,8 @@ public void Dispose()
8888
activeVectorManager?.WaitForVectorOperationsToComplete();
8989
activeVectorManager?.ShutdownReplayTasks();
9090

91-
var databaseSessionsSnapshot = respServerSession.GetDatabaseSessionsSnapshot();
92-
foreach (var dbSession in databaseSessionsSnapshot)
93-
{
94-
dbSession.StorageSession.basicContext.Session?.Dispose();
95-
dbSession.StorageSession.objectStoreBasicContext.Session?.Dispose();
96-
}
91+
aofReplayCoordinator.Dispose();
92+
respServerSession.Dispose();
9793
}
9894

9995
/// <summary>
@@ -180,11 +176,6 @@ unsafe void ProcessAofRecord(IMemoryOwner<byte> entry, int length)
180176
if (storeWrapper.serverOptions.FailOnRecoveryError)
181177
throw;
182178
}
183-
finally
184-
{
185-
aofReplayCoordinator.Dispose();
186-
respServerSession.Dispose();
187-
}
188179

189180
return -1;
190181
}

libs/server/Storage/Functions/ObjectStore/RMWMethods.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ bool InPlaceUpdaterWorker(ref byte[] key, ref ObjectInput input, ref IGarnetObje
145145
{
146146
functionsState.objectStoreSizeTracker?.AddTrackedSize(-value.Size);
147147
value = null;
148+
if (!rmwInfo.RecordInfo.Modified)
149+
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
150+
if (functionsState.appendOnlyFile != null)
151+
rmwInfo.UserData |= NeedAofLog;
148152
rmwInfo.Action = RMWAction.ExpireAndStop;
149153
return false;
150154
}
@@ -238,6 +242,10 @@ public bool PostCopyUpdater(ref byte[] key, ref ObjectInput input, ref IGarnetOb
238242

239243
if (output.HasRemoveKey)
240244
{
245+
// Log to AOF before returning, so the mutation that emptied the collection
246+
// is persisted and replayed correctly on recovery.
247+
if (functionsState.appendOnlyFile != null)
248+
rmwInfo.UserData |= NeedAofLog;
241249
rmwInfo.Action = RMWAction.ExpireAndStop;
242250
return false;
243251
}

test/Garnet.test/MultiDatabaseTests.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,6 +1283,45 @@ public void MultiDatabaseAofRecoverObjectTest()
12831283
}
12841284
}
12851285

1286+
[Test]
1287+
public void MultiDatabaseAofObjectMutationRecoverTest()
1288+
{
1289+
// Verify that object mutation operations (LPOP, HDEL) that empty collections
1290+
// are persisted correctly across multiple databases during AOF recovery
1291+
var listKey = "list:key1";
1292+
var hashKey = "hash:key1";
1293+
1294+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
1295+
{
1296+
// DB 0: push and pop a list element (empties the list)
1297+
var db0 = redis.GetDatabase(0);
1298+
db0.ListLeftPush(listKey, "value1");
1299+
db0.ListLeftPop(listKey);
1300+
ClassicAssert.IsFalse(db0.KeyExists(listKey));
1301+
1302+
// DB 1: add and delete hash fields (empties the hash)
1303+
var db1 = redis.GetDatabase(1);
1304+
db1.HashSet(hashKey, [new HashEntry("f1", "v1"), new HashEntry("f2", "v2")]);
1305+
db1.HashDelete(hashKey, ["f1", "f2"]);
1306+
ClassicAssert.IsFalse(db1.KeyExists(hashKey));
1307+
}
1308+
1309+
server.Store.CommitAOF(true);
1310+
server.Dispose(false);
1311+
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
1312+
server.Start();
1313+
1314+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
1315+
{
1316+
// After recovery, both keys should remain absent
1317+
var db0 = redis.GetDatabase(0);
1318+
ClassicAssert.IsFalse(db0.KeyExists(listKey));
1319+
1320+
var db1 = redis.GetDatabase(1);
1321+
ClassicAssert.IsFalse(db1.KeyExists(hashKey));
1322+
}
1323+
}
1324+
12861325
[Test]
12871326
public void MultiDatabaseSaveInProgressTest()
12881327
{

test/Garnet.test/RespAofTests.cs

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,196 @@ public void AofListObjectStoreRecoverTest()
790790
}
791791
}
792792

793+
[Test]
794+
public void AofObjectStoreRMWDeleteRecoverListTest()
795+
{
796+
// Verify LPOP that empties a list is persisted to AOF and recovered correctly
797+
var key = "AofObjectStoreRMWDeleteRecoverListKey";
798+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
799+
{
800+
var db = redis.GetDatabase(0);
801+
802+
db.ListLeftPush(key, "value1");
803+
var popped = db.ListLeftPop(key);
804+
ClassicAssert.AreEqual("value1", popped.ToString());
805+
806+
// Key should not exist after popping the only element
807+
ClassicAssert.IsFalse(db.KeyExists(key));
808+
}
809+
810+
server.Store.CommitAOF(true);
811+
server.Dispose(false);
812+
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
813+
server.Start();
814+
815+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
816+
{
817+
var db = redis.GetDatabase(0);
818+
// After recovery, the key should still not exist
819+
ClassicAssert.IsFalse(db.KeyExists(key));
820+
}
821+
}
822+
823+
[Test]
824+
public void AofObjectStoreRMWDeleteRecoverSortedSetTest()
825+
{
826+
// Verify ZREM that removes one member from a sorted set is persisted to AOF and recovered correctly
827+
var key = "AofObjectStoreRMWDeleteRecoverSortedSetKey";
828+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
829+
{
830+
var db = redis.GetDatabase(0);
831+
832+
db.SortedSetAdd(key, [new SortedSetEntry("top1", 50), new SortedSetEntry("top2", 60)]);
833+
db.SortedSetRemove(key, "top1");
834+
835+
// Sorted set should have one remaining member
836+
var score = db.SortedSetScore(key, "top1");
837+
ClassicAssert.IsFalse(score.HasValue);
838+
839+
score = db.SortedSetScore(key, "top2");
840+
ClassicAssert.IsTrue(score.HasValue);
841+
ClassicAssert.AreEqual(60, score.Value);
842+
}
843+
844+
server.Store.CommitAOF(true);
845+
server.Dispose(false);
846+
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
847+
server.Start();
848+
849+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
850+
{
851+
var db = redis.GetDatabase(0);
852+
853+
// After recovery, "top1" should still be absent
854+
var score = db.SortedSetScore(key, "top1");
855+
ClassicAssert.IsFalse(score.HasValue);
856+
857+
score = db.SortedSetScore(key, "top2");
858+
ClassicAssert.IsTrue(score.HasValue);
859+
ClassicAssert.AreEqual(60, score.Value);
860+
}
861+
}
862+
863+
[Test]
864+
public void AofObjectStoreRMWDeleteRecoverSortedSetEmptyTest()
865+
{
866+
// Verify ZREM that empties a sorted set completely is persisted to AOF and recovered correctly
867+
var key = "AofObjectStoreRMWDeleteRecoverSortedSetEmptyKey";
868+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
869+
{
870+
var db = redis.GetDatabase(0);
871+
872+
db.SortedSetAdd(key, [new SortedSetEntry("top1", 50)]);
873+
db.SortedSetRemove(key, "top1");
874+
875+
// Key should not exist after removing the only member
876+
ClassicAssert.IsFalse(db.KeyExists(key));
877+
}
878+
879+
server.Store.CommitAOF(true);
880+
server.Dispose(false);
881+
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
882+
server.Start();
883+
884+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
885+
{
886+
var db = redis.GetDatabase(0);
887+
ClassicAssert.IsFalse(db.KeyExists(key));
888+
}
889+
}
890+
891+
[Test]
892+
public void AofObjectStoreRMWDeleteRecoverHashTest()
893+
{
894+
// Verify HDEL that removes fields from a hash is persisted to AOF and recovered correctly
895+
var key = "AofObjectStoreRMWDeleteRecoverHashKey";
896+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
897+
{
898+
var db = redis.GetDatabase(0);
899+
900+
db.HashSet(key, [new HashEntry("hkey1", "v1"), new HashEntry("hkey2", "v2")]);
901+
db.HashDelete(key, ["hkey1", "hkey2"]);
902+
903+
// Key should not exist after deleting all hash fields
904+
ClassicAssert.IsFalse(db.KeyExists(key));
905+
}
906+
907+
server.Store.CommitAOF(true);
908+
server.Dispose(false);
909+
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
910+
server.Start();
911+
912+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
913+
{
914+
var db = redis.GetDatabase(0);
915+
ClassicAssert.IsFalse(db.KeyExists(key));
916+
}
917+
}
918+
919+
[Test]
920+
public void AofObjectStoreRMWDeleteRecoverSetTest()
921+
{
922+
// Verify SREM that empties a set is persisted to AOF and recovered correctly
923+
var key = "AofObjectStoreRMWDeleteRecoverSetKey";
924+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
925+
{
926+
var db = redis.GetDatabase(0);
927+
928+
db.SetAdd(key, ["member1", "member2"]);
929+
db.SetRemove(key, ["member1", "member2"]);
930+
931+
ClassicAssert.IsFalse(db.KeyExists(key));
932+
}
933+
934+
server.Store.CommitAOF(true);
935+
server.Dispose(false);
936+
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
937+
server.Start();
938+
939+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
940+
{
941+
var db = redis.GetDatabase(0);
942+
ClassicAssert.IsFalse(db.KeyExists(key));
943+
}
944+
}
945+
946+
[Test]
947+
public void AofObjectStoreRMWPartialDeleteRecoverHashTest()
948+
{
949+
// Verify HDEL that removes only some fields (not emptying the hash) is persisted to AOF and recovered correctly
950+
var key = "AofObjectStoreRMWPartialDeleteRecoverHashKey";
951+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
952+
{
953+
var db = redis.GetDatabase(0);
954+
955+
db.HashSet(key, [new HashEntry("hkey1", "v1"), new HashEntry("hkey2", "v2"), new HashEntry("hkey3", "v3")]);
956+
db.HashDelete(key, "hkey1");
957+
db.HashDelete(key, "hkey2");
958+
959+
// Key should still exist with remaining field
960+
ClassicAssert.IsTrue(db.KeyExists(key));
961+
ClassicAssert.AreEqual("v3", db.HashGet(key, "hkey3").ToString());
962+
ClassicAssert.IsFalse(db.HashGet(key, "hkey1").HasValue);
963+
ClassicAssert.IsFalse(db.HashGet(key, "hkey2").HasValue);
964+
}
965+
966+
server.Store.CommitAOF(true);
967+
server.Dispose(false);
968+
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
969+
server.Start();
970+
971+
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
972+
{
973+
var db = redis.GetDatabase(0);
974+
975+
// After recovery, deleted fields should remain deleted
976+
ClassicAssert.IsTrue(db.KeyExists(key));
977+
ClassicAssert.AreEqual("v3", db.HashGet(key, "hkey3").ToString());
978+
ClassicAssert.IsFalse(db.HashGet(key, "hkey1").HasValue);
979+
ClassicAssert.IsFalse(db.HashGet(key, "hkey2").HasValue);
980+
}
981+
}
982+
793983
[Test]
794984
public void AofCustomTxnRecoverTest()
795985
{

test/Garnet.test/TransactionTests.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,5 +587,45 @@ private static void updateKey(string key, string value)
587587
var expectedResponse = "+OK\r\n";
588588
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
589589
}
590+
591+
[Test]
592+
public void WatchFailsWhenListEmptiedByLPop()
593+
{
594+
// WATCH a list key, then LPOP all elements on the same connection.
595+
// The LPOP that empties the list should increment the watch version,
596+
// causing the subsequent EXEC to fail.
597+
using var lightClientRequest = TestUtils.CreateRequest();
598+
var key = "watchlist";
599+
600+
// Create a single-element list
601+
var response = lightClientRequest.SendCommand($"LPUSH {key} value1");
602+
var expectedResponse = ":1\r\n";
603+
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
604+
605+
// WATCH the list key
606+
response = lightClientRequest.SendCommand($"WATCH {key}");
607+
expectedResponse = "+OK\r\n";
608+
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
609+
610+
// LPOP the only element (empties and deletes the list) — same connection, before MULTI
611+
response = lightClientRequest.SendCommand($"LPOP {key}");
612+
expectedResponse = "$6\r\nvalue1\r\n";
613+
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
614+
615+
// Start a transaction
616+
response = lightClientRequest.SendCommand("MULTI");
617+
expectedResponse = "+OK\r\n";
618+
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
619+
620+
// Queue a command
621+
response = lightClientRequest.SendCommand($"LPUSH {key} value2");
622+
expectedResponse = "+QUEUED\r\n";
623+
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
624+
625+
// EXEC should fail because the watched key was modified by LPOP
626+
response = lightClientRequest.SendCommand("EXEC");
627+
expectedResponse = "*-1";
628+
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
629+
}
590630
}
591631
}

0 commit comments

Comments
 (0)