Skip to content

Commit 727a45c

Browse files
authored
fix: adding namespace support to java SDK CommitBuilder from dataset (#6257)
Currently, the namespace / tableID settings are not available when initializing a CommitBuilder through the Java SDK (although they are from the URI variant). This is problematic in scenarios where we need to update the CommitHandler functionality. For example, when managed versioning is disabled on a REST namespace implementation we override the CommitHandler to commit directly to object store rather than route through the REST namespace.
1 parent 88c627b commit 727a45c

4 files changed

Lines changed: 176 additions & 3 deletions

File tree

java/lance-jni/src/blocking_dataset.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ impl BlockingDataset {
341341
storage_format: Option<LanceFileVersion>,
342342
max_retries: u32,
343343
skip_auto_cleanup: bool,
344+
commit_handler: Option<Arc<dyn CommitHandler>>,
344345
) -> Result<Self> {
345346
let mut builder = CommitBuilder::new(Arc::new(self.clone().inner))
346347
.with_store_params(store_params)
@@ -358,6 +359,9 @@ impl BlockingDataset {
358359
if skip_auto_cleanup {
359360
builder = builder.with_skip_auto_cleanup(true);
360361
}
362+
if let Some(handler) = commit_handler {
363+
builder = builder.with_commit_handler(handler);
364+
}
361365
let new_dataset = RT.block_on(builder.execute(transaction))?;
362366
Ok(BlockingDataset { inner: new_dataset })
363367
}

java/lance-jni/src/transaction.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,8 @@ pub extern "system" fn Java_org_lance_CommitBuilder_nativeCommitToDataset<'local
623623
storage_format_obj: JObject,
624624
max_retries: jint,
625625
skip_auto_cleanup: jboolean,
626+
namespace_obj: JObject,
627+
table_id_obj: JObject,
626628
) -> JObject<'local> {
627629
ok_or_throw!(
628630
env,
@@ -637,6 +639,8 @@ pub extern "system" fn Java_org_lance_CommitBuilder_nativeCommitToDataset<'local
637639
storage_format_obj,
638640
max_retries as u32,
639641
skip_auto_cleanup != 0,
642+
namespace_obj,
643+
table_id_obj,
640644
)
641645
)
642646
}
@@ -653,6 +657,8 @@ fn inner_commit_to_dataset<'local>(
653657
storage_format_obj: JObject,
654658
max_retries: u32,
655659
skip_auto_cleanup: bool,
660+
namespace_obj: JObject,
661+
table_id_obj: JObject,
656662
) -> Result<JObject<'local>> {
657663
let write_param = if write_params_obj.is_null() {
658664
HashMap::new()
@@ -745,6 +751,15 @@ fn inner_commit_to_dataset<'local>(
745751
Some(&mut java_blocking_ds),
746752
)?;
747753

754+
// Set namespace commit handler if provided
755+
let namespace_info = extract_namespace_info(env, &namespace_obj, &table_id_obj)?;
756+
let commit_handler = namespace_info.map(|(ns, tid)| {
757+
let external_store = LanceNamespaceExternalManifestStore::new(ns, tid);
758+
Arc::new(ExternalManifestCommitHandler {
759+
external_manifest_store: Arc::new(external_store),
760+
}) as Arc<dyn CommitHandler>
761+
});
762+
748763
let new_blocking_ds = {
749764
let mut dataset_guard =
750765
unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?;
@@ -757,6 +772,7 @@ fn inner_commit_to_dataset<'local>(
757772
storage_format,
758773
max_retries,
759774
skip_auto_cleanup,
775+
commit_handler,
760776
)?
761777
};
762778
new_blocking_ds.into_java(env)

java/src/main/java/org/lance/CommitBuilder.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ public CommitBuilder storageOptionsProvider(StorageOptionsProvider provider) {
124124
}
125125

126126
/**
127-
* Set the namespace for managed versioning during URI-based commits.
127+
* Set the namespace for managed versioning. When set, commits are routed through the namespace's
128+
* {@code createTableVersion} API instead of writing directly to the object store. This is
129+
* supported for both dataset-based and URI-based commits.
128130
*
129131
* @param namespace the LanceNamespace instance
130132
* @return this builder instance
@@ -250,7 +252,9 @@ public Dataset execute(Transaction transaction) {
250252
useStableRowIds,
251253
storageFormat,
252254
maxRetries,
253-
skipAutoCleanup);
255+
skipAutoCleanup,
256+
namespace,
257+
tableId);
254258
result.setAllocator(dataset.allocator());
255259
return result;
256260
}
@@ -285,7 +289,9 @@ private static native Dataset nativeCommitToDataset(
285289
Boolean useStableRowIds,
286290
String storageFormat,
287291
int maxRetries,
288-
boolean skipAutoCleanup);
292+
boolean skipAutoCleanup,
293+
Object namespace,
294+
Object tableId);
289295

290296
private static native Dataset nativeCommitToUri(
291297
String uri,

java/src/test/java/org/lance/namespace/DirectoryNamespaceTest.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,17 @@
1313
*/
1414
package org.lance.namespace;
1515

16+
import org.lance.CommitBuilder;
1617
import org.lance.Dataset;
18+
import org.lance.Fragment;
19+
import org.lance.FragmentMetadata;
1720
import org.lance.ReadOptions;
21+
import org.lance.Transaction;
1822
import org.lance.WriteParams;
1923
import org.lance.namespace.model.*;
2024
import org.lance.namespace.model.DescribeTableVersionRequest;
2125
import org.lance.namespace.model.DescribeTableVersionResponse;
26+
import org.lance.operation.Append;
2227

2328
import org.apache.arrow.memory.BufferAllocator;
2429
import org.apache.arrow.memory.RootAllocator;
@@ -673,6 +678,148 @@ public VectorSchemaRoot getVectorSchemaRoot() {
673678
}
674679
}
675680

681+
@Test
682+
void testDatasetBasedCommitBuilderWithNamespace(@TempDir Path managedVersioningTempDir)
683+
throws Exception {
684+
try (BufferAllocator allocator = new RootAllocator()) {
685+
TableVersionTrackingNamespace namespace =
686+
new TableVersionTrackingNamespace(managedVersioningTempDir);
687+
String tableName = "test_table";
688+
List<String> tableId = Arrays.asList(tableName);
689+
690+
Schema schema =
691+
new Schema(
692+
Arrays.asList(
693+
new Field("id", FieldType.nullable(new ArrowType.Int(32, true)), null),
694+
new Field("name", FieldType.nullable(new ArrowType.Utf8()), null)));
695+
696+
// Create initial dataset through namespace using WriteDatasetBuilder
697+
try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
698+
IntVector idVector = (IntVector) root.getVector("id");
699+
VarCharVector nameVector = (VarCharVector) root.getVector("name");
700+
701+
idVector.allocateNew(2);
702+
nameVector.allocateNew(2);
703+
idVector.set(0, 1);
704+
idVector.set(1, 2);
705+
nameVector.set(0, "Alice".getBytes());
706+
nameVector.set(1, "Bob".getBytes());
707+
idVector.setValueCount(2);
708+
nameVector.setValueCount(2);
709+
root.setRowCount(2);
710+
711+
ArrowReader reader =
712+
new ArrowReader(allocator) {
713+
boolean firstRead = true;
714+
715+
@Override
716+
public boolean loadNextBatch() {
717+
if (firstRead) {
718+
firstRead = false;
719+
return true;
720+
}
721+
return false;
722+
}
723+
724+
@Override
725+
public long bytesRead() {
726+
return 0;
727+
}
728+
729+
@Override
730+
protected void closeReadSource() {}
731+
732+
@Override
733+
protected Schema readSchema() {
734+
return schema;
735+
}
736+
737+
@Override
738+
public VectorSchemaRoot getVectorSchemaRoot() {
739+
return root;
740+
}
741+
};
742+
743+
try (Dataset dataset =
744+
Dataset.write()
745+
.allocator(allocator)
746+
.reader(reader)
747+
.namespace(namespace)
748+
.tableId(tableId)
749+
.mode(WriteParams.WriteMode.CREATE)
750+
.execute()) {
751+
assertEquals(2, dataset.countRows());
752+
assertEquals(1, dataset.version());
753+
}
754+
}
755+
756+
// Verify initial create used createTableVersion once
757+
assertEquals(
758+
1,
759+
namespace.getCreateTableVersionCount(),
760+
"create_table_version should be called once during CREATE");
761+
762+
// Open dataset through namespace (returns dataset with managed versioning)
763+
Dataset existingDataset =
764+
Dataset.open().allocator(allocator).namespace(namespace).tableId(tableId).build();
765+
766+
// Get the dataset URI for Fragment.create()
767+
String datasetUri = existingDataset.uri();
768+
769+
// Create a new fragment independently (simulating Spark worker behavior)
770+
List<FragmentMetadata> fragments;
771+
try (VectorSchemaRoot appendRoot = VectorSchemaRoot.create(schema, allocator)) {
772+
IntVector idVector = (IntVector) appendRoot.getVector("id");
773+
VarCharVector nameVector = (VarCharVector) appendRoot.getVector("name");
774+
775+
idVector.allocateNew(2);
776+
nameVector.allocateNew(2);
777+
idVector.set(0, 3);
778+
idVector.set(1, 4);
779+
nameVector.set(0, "Charlie".getBytes());
780+
nameVector.set(1, "Diana".getBytes());
781+
idVector.setValueCount(2);
782+
nameVector.setValueCount(2);
783+
appendRoot.setRowCount(2);
784+
785+
fragments =
786+
Fragment.create(datasetUri, allocator, appendRoot, new WriteParams.Builder().build());
787+
}
788+
789+
// Commit using dataset-based CommitBuilder WITH namespace (the new path)
790+
int createCountBefore = namespace.getCreateTableVersionCount();
791+
try (Transaction txn =
792+
new Transaction.Builder()
793+
.readVersion(existingDataset.version())
794+
.operation(Append.builder().fragments(fragments).build())
795+
.build();
796+
Dataset committed =
797+
new CommitBuilder(existingDataset)
798+
.namespace(namespace)
799+
.tableId(tableId)
800+
.execute(txn)) {
801+
assertEquals(2, committed.version());
802+
assertEquals(4, committed.countRows());
803+
}
804+
805+
// Verify createTableVersion was called for the dataset-based commit
806+
assertEquals(
807+
createCountBefore + 1,
808+
namespace.getCreateTableVersionCount(),
809+
"create_table_version should be called for dataset-based CommitBuilder with namespace");
810+
811+
// Verify the data is accessible through namespace
812+
try (Dataset latestDs =
813+
Dataset.open().allocator(allocator).namespace(namespace).tableId(tableId).build()) {
814+
assertEquals(4, latestDs.countRows());
815+
assertEquals(2, latestDs.version());
816+
}
817+
818+
existingDataset.close();
819+
namespace.close();
820+
}
821+
}
822+
676823
@Test
677824
void testConcurrentCreateAndDropWithSingleInstance() throws Exception {
678825
// Initialize namespace first - create parent namespace to ensure __manifest table

0 commit comments

Comments
 (0)