Skip to content

Commit e2255e9

Browse files
author
wangzheyan
committed
feat(python): expose zonemap segment builds
1 parent e256207 commit e2255e9

2 files changed

Lines changed: 116 additions & 9 deletions

File tree

python/python/lance/dataset.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3074,6 +3074,38 @@ def _prepare_scalar_index_request(
30743074
else:
30753075
raise Exception("index_type must be str or IndexConfig")
30763076

3077+
@staticmethod
3078+
def _normalized_index_type(
3079+
index_type: Union[str, IndexConfig],
3080+
) -> str:
3081+
if isinstance(index_type, IndexConfig):
3082+
index_type = index_type.index_type
3083+
return index_type.upper()
3084+
3085+
@classmethod
3086+
def _is_segment_native_scalar_index_type(
3087+
cls,
3088+
index_type: Union[str, IndexConfig],
3089+
) -> bool:
3090+
return cls._normalized_index_type(index_type) in {
3091+
"BTREE",
3092+
"BITMAP",
3093+
"INVERTED",
3094+
"FTS",
3095+
"ZONEMAP",
3096+
}
3097+
3098+
@classmethod
3099+
def _requires_uncommitted_scalar_index(
3100+
cls,
3101+
index_type: Union[str, IndexConfig],
3102+
) -> bool:
3103+
return cls._normalized_index_type(index_type) in {
3104+
"BTREE",
3105+
"BITMAP",
3106+
"ZONEMAP",
3107+
}
3108+
30773109
def create_scalar_index(
30783110
self,
30793111
column: str,
@@ -3291,7 +3323,9 @@ def create_scalar_index(
32913323
column, index_type, kwargs
32923324
)
32933325

3294-
if fragment_ids is not None and logical_index_type in {"BTREE", "BITMAP"}:
3326+
if fragment_ids is not None and self._requires_uncommitted_scalar_index(
3327+
logical_index_type
3328+
):
32953329
raise ValueError(
32963330
f"{logical_index_type} distributed indexing uses "
32973331
"create_index_uncommitted(..., "
@@ -4004,7 +4038,8 @@ def create_index_uncommitted(
40044038
Create one segment without publishing it and return its metadata.
40054039
40064040
This is the public distributed-build API for vector, BTREE scalar,
4007-
canonical bitmap scalar, and INVERTED scalar index construction. Unlike
4041+
canonical bitmap scalar, INVERTED scalar, and ZONEMAP scalar index
4042+
construction. Unlike
40084043
:meth:`create_index`, this method does not publish the index into the
40094044
dataset manifest. Instead, it writes one segment under
40104045
``_indices/<segment_uuid>/`` and returns the resulting
@@ -4020,7 +4055,7 @@ def create_index_uncommitted(
40204055
4. commit the final segment list with
40214056
:meth:`commit_existing_index_segments`
40224057
4023-
BTREE, BITMAP and INVERTED segments may
4058+
BTREE, BITMAP, INVERTED, and ZONEMAP segments may
40244059
be merged with :meth:`merge_existing_index_segments` before commit.
40254060
Parameters are the same as :meth:`create_index`, with one additional
40264061
requirement:
@@ -4047,12 +4082,8 @@ def create_index_uncommitted(
40474082
Index
40484083
Metadata for the segment that was written by this call.
40494084
"""
4050-
is_scalar_segment_request = (
4051-
isinstance(index_type, str)
4052-
and index_type.upper() in {"BTREE", "BITMAP", "INVERTED", "FTS"}
4053-
) or (
4054-
isinstance(index_type, IndexConfig)
4055-
and index_type.index_type.upper() in {"BTREE", "BITMAP", "INVERTED", "FTS"}
4085+
is_scalar_segment_request = self._is_segment_native_scalar_index_type(
4086+
index_type
40564087
)
40574088
if is_scalar_segment_request:
40584089
if fragment_ids is None:

python/python/tests/test_scalar_index.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4054,6 +4054,82 @@ def test_bitmap_uncommitted_segments_can_be_committed_from_python(tmp_path):
40544054
)
40554055

40564056

4057+
def test_zonemap_fragment_ids_parameter_validation(tmp_path):
4058+
ds = generate_multi_fragment_dataset(
4059+
tmp_path, num_fragments=2, rows_per_fragment=100
4060+
)
4061+
4062+
fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()]
4063+
with pytest.raises(ValueError, match="create_index_uncommitted"):
4064+
ds.create_scalar_index(
4065+
column="id",
4066+
index_type="ZONEMAP",
4067+
fragment_ids=[fragment_ids[0]],
4068+
)
4069+
4070+
4071+
def test_zonemap_segment_merge_and_commit_from_python(tmp_path):
4072+
default_rows_per_zone = 8192
4073+
rows_per_fragment = default_rows_per_zone * 2 + 1
4074+
ds = generate_multi_fragment_dataset(
4075+
tmp_path, num_fragments=4, rows_per_fragment=rows_per_fragment
4076+
)
4077+
4078+
index_name = "id_zonemap_segments"
4079+
fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()]
4080+
staged_segments = [
4081+
ds.create_index_uncommitted(
4082+
column="id",
4083+
index_type="ZONEMAP",
4084+
name=index_name,
4085+
fragment_ids=[fragment_id],
4086+
)
4087+
for fragment_id in fragment_ids
4088+
]
4089+
4090+
assert len({segment.uuid for segment in staged_segments}) == len(staged_segments)
4091+
for segment, fragment_id in zip(staged_segments, fragment_ids):
4092+
files = segment.files
4093+
assert files is not None
4094+
assert segment.fragment_ids == {fragment_id}
4095+
assert any(file.path == "zonemap.lance" for file in files)
4096+
assert all(not file.path.startswith("part_") for file in files)
4097+
4098+
merged_segment = ds.merge_existing_index_segments(staged_segments)
4099+
merged_files = merged_segment.files
4100+
assert merged_files is not None
4101+
assert merged_segment.uuid not in {segment.uuid for segment in staged_segments}
4102+
assert merged_segment.fragment_ids == set(fragment_ids)
4103+
assert any(file.path == "zonemap.lance" for file in merged_files)
4104+
assert all(not file.path.startswith("part_") for file in merged_files)
4105+
4106+
ds = ds.commit_existing_index_segments(index_name, "id", [merged_segment])
4107+
descriptions = {index.name: index for index in ds.describe_indices()}
4108+
assert descriptions[index_name].index_type == "ZoneMap"
4109+
assert len(descriptions[index_name].segments) == 1
4110+
4111+
filter_expr = (
4112+
f"id >= {default_rows_per_zone + 8} AND id < {default_rows_per_zone + 108}"
4113+
)
4114+
without_index = ds.scanner(
4115+
filter=filter_expr,
4116+
columns=["id", "text"],
4117+
use_scalar_index=False,
4118+
).to_table()
4119+
with_index = ds.scanner(
4120+
filter=filter_expr,
4121+
columns=["id", "text"],
4122+
use_scalar_index=True,
4123+
).to_table()
4124+
4125+
assert with_index.num_rows == without_index.num_rows
4126+
assert with_index["id"].to_pylist() == without_index["id"].to_pylist()
4127+
assert (
4128+
"ScalarIndexQuery"
4129+
in ds.scanner(filter=filter_expr, use_scalar_index=True).explain_plan()
4130+
)
4131+
4132+
40574133
def test_merge_index_metadata_btree_soft_break(tmp_path):
40584134
ds = generate_multi_fragment_dataset(
40594135
tmp_path, num_fragments=2, rows_per_fragment=100

0 commit comments

Comments
 (0)