Skip to content

Commit 202e451

Browse files
committed
Fix IoTConsensus multi-folder snapshot load for DataRegion
Collect receive folders that actually hold snapshot fragments and load them in a single call. Loading per folder wipes data dirs before relinking, so repeated loads only kept the last folder's data. Add SnapshotLoader multi-dir support and regression tests.
1 parent b443006 commit 202e451

5 files changed

Lines changed: 216 additions & 23 deletions

File tree

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,29 @@ default boolean clearSnapshot() {
118118
*/
119119
boolean loadSnapshot(File latestSnapshotRootDir);
120120

121+
/**
122+
* Load the latest snapshot whose fragments are spread across several root dirs. A single snapshot
123+
* may be received into more than one folder (e.g. IoTConsensus spreads a snapshot's fragments
124+
* across one receive folder per local data dir), so the whole snapshot must be loaded in one call
125+
* rather than once per folder: a state machine whose load wipes its data dirs before relinking
126+
* (such as the DataRegion one) would otherwise have each per-folder load erase the fragments
127+
* linked by the previous folders, leaving only the last folder's data.
128+
*
129+
* <p>The default implementation simply loads each dir in turn, which is correct only for state
130+
* machines whose per-dir load is independent; such state machines should override this when their
131+
* load is destructive across dirs.
132+
*
133+
* @param latestSnapshotRootDirs the dirs that actually hold fragments of the snapshot
134+
* @return {@code true} if the snapshot was loaded successfully, {@code false} otherwise.
135+
*/
136+
default boolean loadSnapshot(List<File> latestSnapshotRootDirs) {
137+
boolean success = true;
138+
for (File dir : latestSnapshotRootDirs) {
139+
success = loadSnapshot(dir) && success;
140+
}
141+
return success;
142+
}
143+
121144
/**
122145
* given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list
123146
* all files recursively under latestSnapshotDir

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -514,26 +514,29 @@ private void clearOldSnapshot() {
514514
public boolean loadSnapshot(String snapshotId) {
515515
// Snapshot fragments are spread across the receive folders by the FolderManager (a DataRegion,
516516
// for example, uses one receive folder per local data dir), so a given snapshot only exists
517-
// under the folders that actually received fragments. Load from those folders and skip the
518-
// others; otherwise the state machine would fail on a folder that never received this snapshot
519-
// and turn a healthy multi-data-dir transfer into a spurious failure.
517+
// under the folders that actually received fragments. Collect exactly those folders and hand
518+
// them to the state machine in a single load call.
519+
//
520+
// It must be a single call rather than one call per folder: the state machine's load is
521+
// destructive (a DataRegion load wipes the data dirs before relinking), so loading folders one
522+
// at a time would make each load erase the fragments linked by the previous folders, leaving
523+
// only the last folder's data. The state machine instead clears the data dirs once and relinks
524+
// every folder's fragments together.
520525
//
521526
// Note: an empty region produces a snapshot with zero fragments, so none of the receive folders
522527
// contains it. That is a legitimate (no-op) load, not a failure, so an absent snapshot must not
523528
// be reported as failure here.
529+
List<File> snapshotDirs = new ArrayList<>();
524530
for (String dir : recvFolderManager.getFolders()) {
525531
File snapshotDir = getSnapshotPath(dir, snapshotId);
526-
if (!snapshotDir.exists()) {
527-
continue;
528-
}
529-
if (!stateMachine.loadSnapshot(snapshotDir)) {
530-
// Stop at the first failure. The snapshot is already broken on this replica, and loading
531-
// the remaining folders is both pointless and harmful: a load wipes the data dirs before
532-
// relinking. Report the failure so the AddPeer coordinator does not activate this peer.
533-
return false;
532+
if (snapshotDir.exists()) {
533+
snapshotDirs.add(snapshotDir);
534534
}
535535
}
536-
return true;
536+
if (snapshotDirs.isEmpty()) {
537+
return true;
538+
}
539+
return stateMachine.loadSnapshot(snapshotDirs);
537540
}
538541

539542
private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.io.IOException;
5353
import java.util.ArrayList;
5454
import java.util.List;
55+
import java.util.function.Supplier;
5556

5657
public class DataRegionStateMachine extends BaseStateMachine {
5758

@@ -125,22 +126,43 @@ public boolean clearSnapshot() {
125126

126127
@Override
127128
public boolean loadSnapshot(File latestSnapshotRootDir) {
128-
String databaseName = region.getDatabaseName();
129+
final String databaseName = region.getDatabaseName();
130+
final String dataRegionIdString = region.getDataRegionIdString();
131+
return loadSnapshot(
132+
() ->
133+
new SnapshotLoader(
134+
latestSnapshotRootDir.getAbsolutePath(), databaseName, dataRegionIdString)
135+
.loadSnapshotForStateMachine(),
136+
latestSnapshotRootDir);
137+
}
138+
139+
@Override
140+
public boolean loadSnapshot(List<File> latestSnapshotRootDirs) {
141+
final String databaseName = region.getDatabaseName();
142+
final String dataRegionIdString = region.getDataRegionIdString();
143+
// A single snapshot is spread across several receive folders, and loading wipes the data dirs
144+
// before relinking. It must therefore be loaded in one shot (clear once, relink every folder)
145+
// rather than once per folder, otherwise each per-folder load would erase the previous folders'
146+
// fragments and leave only the last one's data.
147+
final List<String> snapshotRootPaths = new ArrayList<>();
148+
for (File dir : latestSnapshotRootDirs) {
149+
snapshotRootPaths.add(dir.getAbsolutePath());
150+
}
151+
return loadSnapshot(
152+
() ->
153+
new SnapshotLoader(snapshotRootPaths, databaseName, dataRegionIdString)
154+
.loadSnapshotForStateMachine(),
155+
latestSnapshotRootDirs);
156+
}
157+
158+
private boolean loadSnapshot(Supplier<DataRegion> snapshotLoader, Object snapshotRootForLog) {
129159
String dataRegionIdString = region.getDataRegionIdString();
130160
DataRegionId regionId = new DataRegionId(Integer.parseInt(dataRegionIdString));
131161
try {
132162
DataRegion newRegion =
133-
StorageEngine.getInstance()
134-
.setDataRegionForSnapshotLoad(
135-
regionId,
136-
() ->
137-
new SnapshotLoader(
138-
latestSnapshotRootDir.getAbsolutePath(),
139-
databaseName,
140-
dataRegionIdString)
141-
.loadSnapshotForStateMachine());
163+
StorageEngine.getInstance().setDataRegionForSnapshotLoad(regionId, snapshotLoader);
142164
if (newRegion == null) {
143-
logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, latestSnapshotRootDir);
165+
logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, snapshotRootForLog);
144166
return false;
145167
}
146168
this.region = newRegion;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.nio.file.attribute.BasicFileAttributes;
4545
import java.util.ArrayList;
4646
import java.util.Arrays;
47+
import java.util.Collections;
4748
import java.util.HashMap;
4849
import java.util.LinkedList;
4950
import java.util.List;
@@ -54,11 +55,26 @@ public class SnapshotLoader {
5455
private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class);
5556
private String storageGroupName;
5657
private String snapshotPath;
58+
private List<String> snapshotPaths;
5759
private String dataRegionId;
5860
private SnapshotLogAnalyzer logAnalyzer;
5961

6062
public SnapshotLoader(String snapshotPath, String storageGroupName, String dataRegionId) {
6163
this.snapshotPath = snapshotPath;
64+
this.snapshotPaths = Collections.singletonList(snapshotPath);
65+
this.storageGroupName = storageGroupName;
66+
this.dataRegionId = dataRegionId;
67+
}
68+
69+
/**
70+
* A snapshot received by IoTConsensus is spread across several receive folders (one per local
71+
* data dir), so loading it means relinking the fragments from all of them. The data dirs must be
72+
* cleared exactly once, before relinking from any folder; see {@link
73+
* #loadSnapshotFromMultipleDirs()}.
74+
*/
75+
public SnapshotLoader(List<String> snapshotPaths, String storageGroupName, String dataRegionId) {
76+
this.snapshotPaths = snapshotPaths;
77+
this.snapshotPath = snapshotPaths.isEmpty() ? null : snapshotPaths.get(0);
6278
this.storageGroupName = storageGroupName;
6379
this.dataRegionId = dataRegionId;
6480
}
@@ -100,6 +116,10 @@ private File getSnapshotLogFile() {
100116
* @return
101117
*/
102118
public DataRegion loadSnapshotForStateMachine() {
119+
if (snapshotPaths.size() > 1) {
120+
return loadSnapshotFromMultipleDirs();
121+
}
122+
103123
LOGGER.info(
104124
StorageEngineMessages.LOADING_SNAPSHOT_FOR, storageGroupName, dataRegionId, snapshotPath);
105125

@@ -112,6 +132,33 @@ public DataRegion loadSnapshotForStateMachine() {
112132
}
113133
}
114134

135+
/**
136+
* Load a snapshot whose fragments are spread across several dirs (the IoTConsensus receive
137+
* folders). The snapshot log is not transferred during an IoTConsensus snapshot, so every
138+
* received fragment dir takes the without-log path. Crucially, the data dirs are cleared exactly
139+
* once before relinking from all dirs: clearing per-dir (as one load call per dir would) erases
140+
* the fragments linked by the previous dirs and leaves only the last dir's data. Because each dir
141+
* contributes a disjoint set of files, the relink order does not affect the result.
142+
*/
143+
private DataRegion loadSnapshotFromMultipleDirs() {
144+
LOGGER.info(
145+
StorageEngineMessages.LOADING_SNAPSHOT_FOR, storageGroupName, dataRegionId, snapshotPaths);
146+
try {
147+
deleteAllFilesInDataDirs();
148+
LOGGER.info(StorageEngineMessages.REMOVE_ALL_DATA_FILES_IN_ORIGINAL_DIR);
149+
for (String path : snapshotPaths) {
150+
File snapshotDir = new File(path);
151+
createLinksFromSnapshotDirToDataDirWithoutLog(snapshotDir);
152+
loadCompressionRatio(snapshotDir);
153+
}
154+
return loadSnapshot();
155+
} catch (IOException | DiskSpaceInsufficientException e) {
156+
LOGGER.error(
157+
StorageEngineMessages.EXCEPTION_LOADING_SNAPSHOT_FOR, storageGroupName, dataRegionId, e);
158+
return null;
159+
}
160+
}
161+
115162
private DataRegion loadSnapshotWithoutLog() {
116163
try {
117164
try {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/IoTDBSnapshotTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,104 @@ public void testLoadSnapshot()
245245
}
246246
}
247247

248+
/**
249+
* Regression test for the multi-receive-folder snapshot load. IoTConsensus spreads a single
250+
* snapshot's fragments across one receive folder per local data dir, so loading must clear the
251+
* data dirs once and relink the fragments from every folder. Before the fix each folder was
252+
* loaded with its own clear-then-relink, so every folder but the last had its just-linked
253+
* fragments wiped by the next folder's clear, losing all but the last folder's data.
254+
*/
255+
@Test
256+
public void testLoadSnapshotSpreadAcrossReceiveFolders()
257+
throws IOException, WriteProcessException {
258+
loadSnapshotSpreadAcrossReceiveFolders(false);
259+
}
260+
261+
/**
262+
* The fragments of one snapshot are disjoint across the receive folders, so the order in which
263+
* the folders are relinked must not change the loaded data. This loads the same spread snapshot
264+
* with the receive folders presented in the opposite order and expects the identical result.
265+
*/
266+
@Test
267+
public void testLoadSnapshotFromReceiveFoldersIsOrderIndependent()
268+
throws IOException, WriteProcessException {
269+
loadSnapshotSpreadAcrossReceiveFolders(true);
270+
}
271+
272+
private void loadSnapshotSpreadAcrossReceiveFolders(boolean reversedOrder)
273+
throws IOException, WriteProcessException {
274+
String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs();
275+
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
276+
TierManager.getInstance().resetFolders();
277+
String recvBase0 = "target" + File.separator + "recv-snapshot-0";
278+
String recvBase1 = "target" + File.separator + "recv-snapshot-1";
279+
// Each receive folder holds the snapshot under a "snapshot" subdir, exactly as the IoTConsensus
280+
// receiver materializes it, and carries no snapshot log (the log is never transferred).
281+
File recvFolder0 = new File(recvBase0, SNAPSHOT_DIR_NAME);
282+
File recvFolder1 = new File(recvBase1, SNAPSHOT_DIR_NAME);
283+
try {
284+
Assert.assertTrue(recvFolder0.mkdirs());
285+
Assert.assertTrue(recvFolder1.mkdirs());
286+
287+
// Spread the fragments across the two receive folders: even-indexed files in the first,
288+
// odd-indexed files in the second, so neither folder holds the whole snapshot.
289+
int fileNum = 6;
290+
for (int i = 0; i < fileNum; i++) {
291+
writeSnapshotFragment((i % 2 == 0 ? recvFolder0 : recvFolder1).getAbsolutePath(), i);
292+
}
293+
294+
List<String> snapshotDirs =
295+
reversedOrder
296+
? Arrays.asList(recvFolder1.getAbsolutePath(), recvFolder0.getAbsolutePath())
297+
: Arrays.asList(recvFolder0.getAbsolutePath(), recvFolder1.getAbsolutePath());
298+
299+
DataRegion dataRegion =
300+
new SnapshotLoader(snapshotDirs, testSgName, "0").loadSnapshotForStateMachine();
301+
302+
Assert.assertNotNull(dataRegion);
303+
// Every fragment from every receive folder must be present, regardless of relink order.
304+
assertEquals(fileNum, dataRegion.getTsFileManager().getTsFileList(true).size());
305+
} finally {
306+
FileUtils.recursivelyDeleteFolder(recvBase0);
307+
FileUtils.recursivelyDeleteFolder(recvBase1);
308+
IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs);
309+
TierManager.getInstance().resetFolders();
310+
}
311+
}
312+
313+
/**
314+
* Materialize a single snapshot fragment (a TsFile and its resource) under {@code
315+
* <recvSnapshotDir>/sequence/<sg>/0/0/}, i.e. the layout the without-log loader expects from a
316+
* received snapshot.
317+
*/
318+
private void writeSnapshotFragment(String recvSnapshotDir, int i)
319+
throws IOException, WriteProcessException {
320+
String filePath =
321+
recvSnapshotDir
322+
+ File.separator
323+
+ "sequence"
324+
+ File.separator
325+
+ testSgName
326+
+ File.separator
327+
+ "0"
328+
+ File.separator
329+
+ "0"
330+
+ File.separator
331+
+ String.format("%d-%d-0-0.tsfile", i + 1, i + 1);
332+
File newFile = new File(filePath);
333+
Assert.assertTrue(newFile.getParentFile().exists() || newFile.getParentFile().mkdirs());
334+
TsFileGeneratorUtils.generateMixTsFile(filePath, 5, 5, 10, i * 100, (i + 1) * 100, 10, 10);
335+
TsFileResource resource = new TsFileResource(new File(filePath));
336+
resource.updateStartTime(
337+
IDeviceID.Factory.DEFAULT_FACTORY.create(testSgName + PATH_SEPARATOR + "d" + i), i * 100);
338+
resource.updateEndTime(
339+
IDeviceID.Factory.DEFAULT_FACTORY.create(testSgName + PATH_SEPARATOR + "d" + i),
340+
(i + 1) * 100);
341+
resource.updatePlanIndexes(i);
342+
resource.setStatusForTest(TsFileResourceStatus.NORMAL);
343+
resource.serialize();
344+
}
345+
248346
@Ignore("Need manual execution to specify different disks")
249347
@Test
250348
public void testLoadSnapshotNoHardLink()

0 commit comments

Comments
 (0)