Skip to content

Commit 3bee09f

Browse files
authored
[core] Unify single-column global index writer (#8275)
This PR merges the previous singleton and parallel single-column global index writer APIs into one `GlobalIndexSingleColumnWriter` interface. Single-column index writers now receive the caller-provided shard-relative row id through `write(@nullable Object key, long relativeRowId)`.
1 parent 494a633 commit 3bee09f

30 files changed

Lines changed: 318 additions & 280 deletions

File tree

paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexParallelWriter.java renamed to paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexSingleColumnWriter.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,14 @@
2020

2121
import javax.annotation.Nullable;
2222

23-
/** Parallel Index writer for global index with relative row id (from 0 to rowCnt - 1). */
24-
public interface GlobalIndexParallelWriter extends GlobalIndexWriter {
23+
/** Index writer for single-column global index with relative row id (from 0 to rowCnt - 1). */
24+
public interface GlobalIndexSingleColumnWriter extends GlobalIndexWriter {
2525

2626
/**
27-
* Write the indexed key and its related localRowId to the index File. The input row id is
28-
* "local" which means it is calculated by the original row id minus the start row id of current
29-
* index range.
27+
* Write the indexed key and its related relative row id to the index file.
3028
*
3129
* @param key nullable index key
32-
* @param relativeRowId local row id calculated by {@code rowId - rangeStart}.
30+
* @param relativeRowId local row id calculated by {@code rowId - rangeStart}
3331
*/
3432
void write(@Nullable Object key, long relativeRowId);
3533
}

paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexSingletonWriter.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import java.util.List;
2222

23-
/** Parallel Index writer for global index with relative row id (from 0 to rowCnt - 1). */
23+
/** Index writer for global index. */
2424
public interface GlobalIndexWriter {
2525

2626
List<ResultEntry> finish();

paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020

2121
import org.apache.paimon.compression.BlockCompressionFactory;
2222
import org.apache.paimon.fs.PositionOutputStream;
23-
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
24-
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
23+
import org.apache.paimon.globalindex.GlobalIndexSingleColumnWriter;
2524
import org.apache.paimon.globalindex.ResultEntry;
2625
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
2726
import org.apache.paimon.memory.MemorySlice;
@@ -42,9 +41,9 @@
4241
import java.util.zip.CRC32;
4342

4443
/**
45-
* The {@link GlobalIndexSingletonWriter} implementation for BTree index. Note that users must keep
46-
* written keys monotonically incremental. All null keys are stored in a separate bitmap, which will
47-
* be serialized and appended to the file end on close. The layout is as below:
44+
* The {@link GlobalIndexSingleColumnWriter} implementation for BTree index. Note that users must
45+
* keep written keys monotonically incremental. All null keys are stored in a separate bitmap, which
46+
* will be serialized and appended to the file end on close. The layout is as below:
4847
*
4948
* <pre>
5049
* +-----------------------------------+------+
@@ -67,7 +66,7 @@
6766
* <p>For efficiency, we combine entries with the same keys and store a compact list of row ids for
6867
* each key.
6968
*/
70-
public class BTreeIndexWriter implements GlobalIndexParallelWriter {
69+
public class BTreeIndexWriter implements GlobalIndexSingleColumnWriter {
7170

7271
private final String fileName;
7372
private final PositionOutputStream out;

paimon-common/src/test/java/org/apache/paimon/globalindex/btree/AbstractIndexReaderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import org.apache.paimon.fs.PositionOutputStream;
2727
import org.apache.paimon.fs.local.LocalFileIO;
2828
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
29-
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
3029
import org.apache.paimon.globalindex.GlobalIndexReader;
3130
import org.apache.paimon.globalindex.GlobalIndexResult;
31+
import org.apache.paimon.globalindex.GlobalIndexSingleColumnWriter;
3232
import org.apache.paimon.globalindex.ResultEntry;
3333
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
3434
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
@@ -277,7 +277,7 @@ public void testInPredicate() throws Exception {
277277
protected abstract GlobalIndexReader prepareDataAndCreateReader() throws Exception;
278278

279279
protected GlobalIndexIOMeta writeData(List<Pair<Object, Long>> data) throws IOException {
280-
GlobalIndexParallelWriter indexWriter = globalIndexer.createWriter(fileWriter);
280+
GlobalIndexSingleColumnWriter indexWriter = globalIndexer.createWriter(fileWriter);
281281
for (Pair<Object, Long> pair : data) {
282282
indexWriter.write(pair.getKey(), pair.getValue());
283283
}

paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexMetaTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.paimon.fs.Path;
2424
import org.apache.paimon.fs.PositionOutputStream;
2525
import org.apache.paimon.fs.local.LocalFileIO;
26-
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
26+
import org.apache.paimon.globalindex.GlobalIndexSingleColumnWriter;
2727
import org.apache.paimon.globalindex.ResultEntry;
2828
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
2929
import org.apache.paimon.memory.MemorySliceOutput;
@@ -155,7 +155,7 @@ public PositionOutputStream newOutputStream(String fileName)
155155
new Path(new Path(tempPath.toUri()), fileName), true);
156156
}
157157
};
158-
GlobalIndexParallelWriter indexWriter =
158+
GlobalIndexSingleColumnWriter indexWriter =
159159
new BTreeGlobalIndexer(
160160
new DataField(
161161
1, "testField", new VarCharType(VarCharType.MAX_LENGTH)),

paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeThreadSafetyTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import org.apache.paimon.fs.PositionOutputStream;
2424
import org.apache.paimon.fs.local.LocalFileIO;
2525
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
26-
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
2726
import org.apache.paimon.globalindex.GlobalIndexReader;
2827
import org.apache.paimon.globalindex.GlobalIndexResult;
28+
import org.apache.paimon.globalindex.GlobalIndexSingleColumnWriter;
2929
import org.apache.paimon.globalindex.ResultEntry;
3030
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
3131
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
@@ -353,7 +353,7 @@ private List<GlobalIndexIOMeta> writeMultipleFiles() throws IOException {
353353
}
354354

355355
private GlobalIndexIOMeta writeData(List<Pair<Object, Long>> subData) throws IOException {
356-
GlobalIndexParallelWriter indexWriter = globalIndexer.createWriter(fileWriter);
356+
GlobalIndexSingleColumnWriter indexWriter = globalIndexer.createWriter(fileWriter);
357357
for (Pair<Object, Long> pair : subData) {
358358
indexWriter.write(pair.getKey(), pair.getValue());
359359
}

paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import org.apache.paimon.fs.Path;
2222
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
23-
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
2423
import org.apache.paimon.globalindex.GlobalIndexReader;
2524
import org.apache.paimon.globalindex.GlobalIndexResult;
25+
import org.apache.paimon.globalindex.GlobalIndexSingleColumnWriter;
2626
import org.apache.paimon.globalindex.ResultEntry;
2727
import org.apache.paimon.options.MemorySize;
2828
import org.apache.paimon.options.Options;
@@ -233,7 +233,7 @@ public void testConcurrentAccess() throws Exception {
233233

234234
private GlobalIndexIOMeta writeDataWithIndexer(
235235
BTreeGlobalIndexer indexer, List<Pair<Object, Long>> subData) throws IOException {
236-
GlobalIndexParallelWriter indexWriter = indexer.createWriter(fileWriter);
236+
GlobalIndexSingleColumnWriter indexWriter = indexer.createWriter(fileWriter);
237237
for (Pair<Object, Long> pair : subData) {
238238
indexWriter.write(pair.getKey(), pair.getValue());
239239
}

paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexReader.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class TestFullTextGlobalIndexReader implements GlobalIndexReader {
5454
private final GlobalIndexIOMeta ioMeta;
5555

5656
private String[] documents;
57+
private long[] rowIds;
5758
private int count;
5859

5960
public TestFullTextGlobalIndexReader(
@@ -90,10 +91,10 @@ public CompletableFuture<Optional<ScoredGlobalIndexResult>> visitFullTextSearch(
9091
continue;
9192
}
9293
if (topK.size() < effectiveK) {
93-
topK.offer(new ScoredRow(i, score));
94+
topK.offer(new ScoredRow(rowIds[i], score));
9495
} else if (score > topK.peek().score) {
9596
topK.poll();
96-
topK.offer(new ScoredRow(i, score));
97+
topK.offer(new ScoredRow(rowIds[i], score));
9798
}
9899
}
99100

@@ -138,12 +139,14 @@ private void ensureLoaded() throws IOException {
138139

139140
// Read documents
140141
documents = new String[count];
142+
rowIds = new long[count];
141143
for (int i = 0; i < count; i++) {
142-
byte[] lenBytes = new byte[4];
143-
readFully(in, lenBytes);
144-
ByteBuffer lenBuf = ByteBuffer.wrap(lenBytes);
145-
lenBuf.order(ByteOrder.LITTLE_ENDIAN);
146-
int textLen = lenBuf.getInt();
144+
byte[] entryHeaderBytes = new byte[Long.BYTES + Integer.BYTES];
145+
readFully(in, entryHeaderBytes);
146+
ByteBuffer entryHeader = ByteBuffer.wrap(entryHeaderBytes);
147+
entryHeader.order(ByteOrder.LITTLE_ENDIAN);
148+
rowIds[i] = entryHeader.getLong();
149+
int textLen = entryHeader.getInt();
147150

148151
byte[] textBytes = new byte[textLen];
149152
readFully(in, textBytes);
@@ -170,6 +173,7 @@ private static void readFully(SeekableInputStream in, byte[] buf) throws IOExcep
170173
@Override
171174
public void close() throws IOException {
172175
documents = null;
176+
rowIds = null;
173177
}
174178

175179
// =================== unsupported predicate operations =====================

paimon-common/src/test/java/org/apache/paimon/globalindex/testfulltext/TestFullTextGlobalIndexWriter.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import org.apache.paimon.data.BinaryString;
2222
import org.apache.paimon.fs.PositionOutputStream;
23-
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
23+
import org.apache.paimon.globalindex.GlobalIndexSingleColumnWriter;
2424
import org.apache.paimon.globalindex.ResultEntry;
2525
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
2626

@@ -41,24 +41,27 @@
4141
* <pre>
4242
* [4 bytes] count (int)
4343
* For each document:
44+
* [8 bytes] relative row id (long)
4445
* [4 bytes] text length in bytes (int)
4546
* [N bytes] UTF-8 text
4647
* </pre>
4748
*/
48-
public class TestFullTextGlobalIndexWriter implements GlobalIndexSingletonWriter {
49+
public class TestFullTextGlobalIndexWriter implements GlobalIndexSingleColumnWriter {
4950

5051
private static final String FILE_NAME_PREFIX = "test-fulltext";
5152

5253
private final GlobalIndexFileWriter fileWriter;
53-
private final List<String> documents;
54+
private final List<Document> documents;
55+
private long rowCount;
5456

5557
public TestFullTextGlobalIndexWriter(GlobalIndexFileWriter fileWriter) {
5658
this.fileWriter = fileWriter;
5759
this.documents = new ArrayList<>();
5860
}
5961

6062
@Override
61-
public void write(Object fieldData) {
63+
public void write(Object fieldData, long relativeRowId) {
64+
rowCount++;
6265
if (fieldData == null) {
6366
throw new IllegalArgumentException("Text field data must not be null");
6467
}
@@ -72,7 +75,7 @@ public void write(Object fieldData) {
7275
throw new IllegalArgumentException(
7376
"Unsupported text type: " + fieldData.getClass().getName());
7477
}
75-
documents.add(text);
78+
documents.add(new Document(relativeRowId, text));
7679
}
7780

7881
@Override
@@ -91,20 +94,31 @@ public List<ResultEntry> finish() {
9194
out.write(header.array());
9295

9396
// Documents
94-
for (String doc : documents) {
95-
byte[] textBytes = doc.getBytes(StandardCharsets.UTF_8);
96-
ByteBuffer lenBuf = ByteBuffer.allocate(4);
97-
lenBuf.order(ByteOrder.LITTLE_ENDIAN);
98-
lenBuf.putInt(textBytes.length);
99-
out.write(lenBuf.array());
97+
for (Document doc : documents) {
98+
byte[] textBytes = doc.text.getBytes(StandardCharsets.UTF_8);
99+
ByteBuffer entryHeader = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
100+
entryHeader.order(ByteOrder.LITTLE_ENDIAN);
101+
entryHeader.putLong(doc.relativeRowId);
102+
entryHeader.putInt(textBytes.length);
103+
out.write(entryHeader.array());
100104
out.write(textBytes);
101105
}
102106
out.flush();
103107
}
104108

105-
return Collections.singletonList(new ResultEntry(fileName, documents.size(), null));
109+
return Collections.singletonList(new ResultEntry(fileName, rowCount, null));
106110
} catch (IOException e) {
107111
throw new RuntimeException("Failed to write test full-text index", e);
108112
}
109113
}
114+
115+
private static class Document {
116+
private final long relativeRowId;
117+
private final String text;
118+
119+
private Document(long relativeRowId, String text) {
120+
this.relativeRowId = relativeRowId;
121+
this.text = text;
122+
}
123+
}
110124
}

0 commit comments

Comments
 (0)