Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,36 @@ public abstract class CompactTask implements Callable<CompactResult> {

@Nullable private final CompactionMetrics.Reporter metricsReporter;

@Nullable private String partitionString;
private int bucket = -1;

public CompactTask(@Nullable CompactionMetrics.Reporter metricsReporter) {
this.metricsReporter = metricsReporter;
}

/** Set partition and bucket info for logging purposes. */
public void setPartitionBucketInfo(@Nullable String partitionString, int bucket) {
this.partitionString = partitionString;
this.bucket = bucket;
}

@Override
public CompactResult call() throws Exception {
MetricUtils.safeCall(this::startTimer, LOG);
LOG.info(
"Paimon compact task started: partition={}, bucket={}, taskType={}",
partitionString,
bucket,
getClass().getSimpleName());
try {
long startMillis = System.currentTimeMillis();
CompactResult result = doCompact();
long durationMs = System.currentTimeMillis() - startMillis;

MetricUtils.safeCall(
() -> {
if (metricsReporter != null) {
metricsReporter.reportCompactionTime(
System.currentTimeMillis() - startMillis);
metricsReporter.reportCompactionTime(durationMs);
metricsReporter.increaseCompactionsCompletedCount();
metricsReporter.reportCompactionInputSize(
result.before().stream()
Expand All @@ -68,10 +82,30 @@ public CompactResult call() throws Exception {
},
LOG);

LOG.info(
"Paimon compact task finished: partition={}, bucket={}, taskType={}, "
+ "inputFiles={}, inputBytes={}, outputFiles={}, outputBytes={}, durationMs={}",
partitionString,
bucket,
getClass().getSimpleName(),
result.before().size(),
result.before().stream().mapToLong(DataFileMeta::fileSize).sum(),
result.after().size(),
result.after().stream().mapToLong(DataFileMeta::fileSize).sum(),
durationMs);

if (LOG.isDebugEnabled()) {
LOG.debug(logMetric(startMillis, result.before(), result.after()));
}
return result;
} catch (Exception e) {
LOG.warn(
"Paimon compact task failed: partition={}, bucket={}, taskType={}",
partitionString,
bucket,
getClass().getSimpleName(),
e);
throw e;
} finally {
MetricUtils.safeCall(this::stopTimer, LOG);
MetricUtils.safeCall(this::decreaseCompactionsQueuedCount, LOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ public RowType readValueType() {
return readValueType;
}

public FileStorePathFactory pathFactory() {
return pathFactory;
}

public KeyValueFileReaderFactory build(
BinaryRow partition, int bucket, DeletionVector.Factory dvFactory) {
return build(partition, bucket, dvFactory, true, Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.io.DataFileMeta;
Expand All @@ -33,6 +34,7 @@
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.MetricUtils;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -72,6 +74,9 @@ public class MergeTreeCompactManager extends CompactFutureManager {

@Nullable private final RecordLevelExpire recordLevelExpire;

@Nullable private String partitionString;
private int partitionBucket = -1;

public MergeTreeCompactManager(
ExecutorService executor,
Levels levels,
Expand Down Expand Up @@ -105,6 +110,19 @@ public MergeTreeCompactManager(
MetricUtils.safeCall(this::reportMetrics, LOG);
}

/**
* Set partition and bucket info so compact tasks can log readable partition/bucket identifiers.
*/
public void setPartitionBucketInfo(
BinaryRow partition, int bucket, FileStorePathFactory pathFactory) {
try {
this.partitionString = pathFactory.getPartitionString(partition);
} catch (Exception e) {
this.partitionString = partition.toString();
}
this.partitionBucket = bucket;
}

@Override
public boolean shouldWaitForLatestCompaction() {
return levels.numberOfSortedRuns() > numSortedRunStopTrigger;
Expand Down Expand Up @@ -244,6 +262,7 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
file.fileName(), file.level(), file.fileSize()))
.collect(Collectors.joining(", ")));
}
task.setPartitionBucketInfo(partitionString, partitionBucket);
taskFuture = executor.submit(task);
if (metricsReporter != null) {
metricsReporter.increaseCompactionsQueuedCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,21 +171,25 @@ public CompactManager create(
if (metricsReporter != null) {
rewriter.setMetricsReporter(metricsReporter);
}
return new MergeTreeCompactManager(
compactExecutor,
levels,
compactStrategy,
keyComparator,
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
metricsReporter,
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire,
options.forceRewriteAllFiles(),
options.isChainTable());
MergeTreeCompactManager compactManager =
new MergeTreeCompactManager(
compactExecutor,
levels,
compactStrategy,
keyComparator,
options.compactionFileSize(true),
options.numSortedRunStopTrigger(),
rewriter,
metricsReporter,
dvMaintainer,
options.prepareCommitWaitCompaction(),
options.needLookup(),
recordLevelExpire,
options.forceRewriteAllFiles(),
options.isChainTable());
compactManager.setPartitionBucketInfo(
partition, bucket, readerFactoryBuilder.pathFactory());
return compactManager;
}

private CompactStrategy createCompactStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,27 +496,28 @@ public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) {
}

private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) {
RestoreFiles restored =
restore.restoreFiles(
partition,
bucket,
dbMaintainerFactory != null,
dvMaintainerFactory != null);
String partInfo = partitionInfo(partition);
RestoreFiles restored;
try {
restored =
restore.restoreFiles(
partition,
bucket,
dbMaintainerFactory != null,
dvMaintainerFactory != null);
} catch (RuntimeException e) {
throw new RuntimeException(
String.format(
"Failed to restore existing files for %s, bucket %d.",
partInfo, bucket),
e);
}
Integer restoredTotalBuckets = restored.totalBuckets();
int totalBuckets = numBuckets;
if (restoredTotalBuckets != null) {
totalBuckets = restoredTotalBuckets;
}
if (!ignoreNumBucketCheck && totalBuckets != numBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
? "partition "
+ getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition)
: "table";
throw new RuntimeException(
String.format(
"Try to write %s with a new bucket num %d, but the previous bucket num is %d. "
Expand All @@ -526,6 +527,17 @@ private RestoreFiles scanExistingFileMetas(BinaryRow partition, int bucket) {
return restored;
}

private String partitionInfo(BinaryRow partition) {
return partitionType.getFieldCount() > 0
? "partition "
+ getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition)
: "table";
}

private ExecutorService compactExecutor() {
if (lazyCompactExecutor == null) {
lazyCompactExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,20 @@ RestoreFiles restoreFiles(

@Nullable
static Integer extractDataFiles(List<ManifestEntry> entries, List<DataFileMeta> dataFiles) {
return extractDataFiles(entries, dataFiles, null);
}

@Nullable
static Integer extractDataFiles(
List<ManifestEntry> entries, List<DataFileMeta> dataFiles, @Nullable String context) {
Integer totalBuckets = null;
for (ManifestEntry entry : entries) {
if (totalBuckets != null && totalBuckets != entry.totalBuckets()) {
String contextInfo = context == null ? "" : " for " + context;
throw new RuntimeException(
String.format(
"Bucket data files has different total bucket number, %s vs %s, this should be a bug.",
totalBuckets, entry.totalBuckets()));
"Bucket data files%s has different total bucket number, %s vs %s, this should be a bug.",
contextInfo, totalBuckets, entry.totalBuckets()));
}
totalBuckets = entry.totalBuckets();
dataFiles.add(entry.file());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.FileStorePathFactory;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
Expand All @@ -42,6 +43,7 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.utils.InstantiationUtil.deserializeObject;
import static org.apache.paimon.utils.InstantiationUtil.serializeObject;
Expand Down Expand Up @@ -169,7 +171,11 @@ public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest reques

List<DataFileMeta> restoreFiles = new ArrayList<>();
List<ManifestEntry> entries = scan.withPartitionBucket(partition, bucket).plan().files();
Integer totalBuckets = WriteRestore.extractDataFiles(entries, restoreFiles);
Integer totalBuckets =
WriteRestore.extractDataFiles(
entries,
restoreFiles,
String.format("%s, bucket %d", partitionInfo(partition), bucket));

IndexFileMeta dynamicBucketIndex = null;
if (request.scanDynamicBucketIndex()) {
Expand Down Expand Up @@ -212,6 +218,19 @@ public void checkpoint() {
latestCommittedIdentifiers.clear();
}

private String partitionInfo(BinaryRow partition) {
if (table.schema().logicalPartitionType().getFieldCount() == 0) {
return "table";
}
return "partition "
+ FileStorePathFactory
.getPartitionComputer(
table.schema().logicalPartitionType(),
table.coreOptions().toConfiguration().get(PARTITION_DEFAULT_NAME),
table.coreOptions().legacyPartitionName())
.generatePartValues(partition);
}

private static class CoordinationKey {

private final byte[] content;
Expand Down
Loading