Skip to content

Commit 54545a9

Browse files
authored
[python] Fix Ray read_paimon dropping nested projection (reads nested leaves as NULL) (#8269)
1 parent 5fde0cd commit 54545a9

5 files changed

Lines changed: 92 additions & 2 deletions

File tree

paimon-python/pypaimon/read/datasource/ray_datasource.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def get_read_tasks(self, parallelism: int, **kwargs) -> List:
124124
table = self._split_provider.table()
125125
predicate = self._split_provider.predicate()
126126
read_type = self._split_provider.read_type()
127+
nested_name_paths = self._split_provider.nested_name_paths()
127128
splits = self._split_provider.splits()
128129
limit = self._split_provider.limit()
129130
if not splits:
@@ -148,11 +149,17 @@ def _get_read_task(
148149
read_type=read_type,
149150
schema=schema,
150151
limit=limit,
152+
nested_name_paths=nested_name_paths,
151153
) -> Iterable[pyarrow.Table]:
152154
"""Read function that will be executed by Ray workers."""
153155
from pypaimon.read.table_read import TableRead
156+
# nested_name_paths must be forwarded so a nested-leaf projection
157+
# widens to the parent struct and extracts the leaves; without it
158+
# the worker treats the flattened leaf names as missing top-level
159+
# columns and reads every projected leaf as NULL.
154160
worker_table_read = TableRead(
155-
table, predicate, read_type, limit=limit)
161+
table, predicate, read_type, limit=limit,
162+
nested_name_paths=nested_name_paths)
156163

157164
batch_reader = worker_table_read.to_arrow_batch_reader(splits)
158165
has_data = False
@@ -179,6 +186,7 @@ def _get_read_task(
179186
read_type=read_type,
180187
schema=schema,
181188
limit=limit,
189+
nested_name_paths=nested_name_paths,
182190
)
183191

184192
read_tasks = []

paimon-python/pypaimon/read/datasource/split_provider.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,17 @@ def limit(self) -> Optional[int]:
6767
"""
6868
return None
6969

70+
def nested_name_paths(self) -> Optional[List[List[str]]]:
71+
"""Parallel name paths for a nested-leaf projection, or ``None``.
72+
73+
Forwarded to the per-task ``TableRead`` so a projection like
74+
``['mv.latest_value.x']`` is read by widening to the parent struct and
75+
extracting the requested leaves. Without it the worker treats the
76+
flattened leaf names as missing top-level columns and reads every
77+
projected leaf as NULL.
78+
"""
79+
return None
80+
7081

7182
class CatalogSplitProvider(SplitProvider):
7283
"""Plan splits from a fully-qualified table identifier and catalog options.
@@ -124,6 +135,7 @@ def __init__(
124135
self._table_cached = None
125136
self._splits_cached = None
126137
self._read_type_cached = None
138+
self._nested_name_paths_cached = None
127139

128140
def _ensure_table(self):
129141
if self._table_cached is None:
@@ -154,6 +166,7 @@ def _ensure_planned(self):
154166
if self._limit is not None:
155167
rb = rb.with_limit(self._limit)
156168
self._read_type_cached = rb.read_type()
169+
self._nested_name_paths_cached = rb._nested_name_paths()
157170
self._splits_cached = rb.new_scan().plan().splits()
158171

159172
@property
@@ -171,6 +184,10 @@ def read_type(self):
171184
self._ensure_planned()
172185
return self._read_type_cached
173186

187+
def nested_name_paths(self) -> Optional[List[List[str]]]:
188+
self._ensure_planned()
189+
return self._nested_name_paths_cached
190+
174191
def predicate(self):
175192
return self._predicate
176193

@@ -190,12 +207,13 @@ class PreResolvedSplitProvider(SplitProvider):
190207
"""
191208

192209
def __init__(self, table, splits: List[Split], read_type, predicate=None,
193-
limit: Optional[int] = None):
210+
limit: Optional[int] = None, nested_name_paths=None):
194211
self._table = table
195212
self._splits = splits
196213
self._read_type = read_type
197214
self._predicate = predicate
198215
self._limit = limit
216+
self._nested_name_paths = nested_name_paths
199217

200218
def table(self):
201219
return self._table
@@ -206,6 +224,9 @@ def splits(self) -> List[Split]:
206224
def read_type(self):
207225
return self._read_type
208226

227+
def nested_name_paths(self) -> Optional[List[List[str]]]:
228+
return self._nested_name_paths
229+
209230
def predicate(self):
210231
return self._predicate
211232

paimon-python/pypaimon/read/table_read.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ def to_ray(
530530
read_type=self.read_type,
531531
predicate=self.predicate,
532532
limit=self.limit,
533+
nested_name_paths=self.nested_name_paths,
533534
)
534535
)
535536
ds = ray.data.read_datasource(

paimon-python/pypaimon/tests/ray_data_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,41 @@ def process_blob(batch):
834834
"Blob data column should match"
835835
)
836836

837+
def test_to_ray_with_nested_projection(self):
838+
"""to_ray() respects a nested-leaf projection.
839+
840+
Sibling of the read_paimon() nested-projection test: this exercises
841+
the PreResolvedSplitProvider entry point (TableRead.to_ray), which
842+
must also forward nested_name_paths to the worker TableRead. Without
843+
it the worker treats the flattened leaf name as a missing top-level
844+
column and reads the projected leaf as NULL.
845+
"""
846+
inner = pa.struct([('a', pa.int64()), ('b', pa.string())])
847+
pa_schema = pa.schema([('id', pa.int64()), ('payload', inner)])
848+
schema = Schema.from_pyarrow_schema(pa_schema)
849+
self.catalog.create_table('default.test_ray_nested_proj', schema, False)
850+
table = self.catalog.get_table('default.test_ray_nested_proj')
851+
852+
write_builder = table.new_batch_write_builder()
853+
writer = write_builder.new_write()
854+
writer.write_arrow(pa.Table.from_pylist(
855+
[{'id': 1, 'payload': {'a': 10, 'b': 'x'}},
856+
{'id': 2, 'payload': {'a': 20, 'b': 'y'}}],
857+
schema=pa_schema))
858+
commit = write_builder.new_commit()
859+
commit.commit(writer.prepare_commit())
860+
writer.close()
861+
862+
read_builder = table.new_read_builder().with_projection(['id', 'payload.a'])
863+
table_read = read_builder.new_read()
864+
splits = read_builder.new_scan().plan().splits()
865+
866+
ray_dataset = table_read.to_ray(splits, override_num_blocks=1)
867+
rows = {r['id']: r for r in ray_dataset.take_all()}
868+
self.assertEqual(set(rows.keys()), {1, 2})
869+
self.assertEqual(rows[1]['payload_a'], 10)
870+
self.assertEqual(rows[2]['payload_a'], 20)
871+
837872

838873
if __name__ == '__main__':
839874
unittest.main()

paimon-python/pypaimon/tests/ray_integration_test.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,31 @@ def test_read_paimon_with_projection(self):
119119
self.assertEqual(set(df.columns), {'id', 'name'})
120120
self.assertEqual(len(df), 2)
121121

122+
def test_read_paimon_with_nested_projection(self):
123+
"""read_paimon() respects a nested-leaf projection.
124+
125+
Regression for the worker-side TableRead being rebuilt without
126+
nested_name_paths: a projection like ['payload.a'] used to read every
127+
nested leaf as NULL because the worker treated the flattened leaf name
128+
as a missing top-level column.
129+
"""
130+
from pypaimon.ray import read_paimon
131+
132+
inner = pa.struct([('a', pa.int64()), ('b', pa.string())])
133+
pa_schema = pa.schema([('id', pa.int32()), ('payload', inner)])
134+
identifier = self._create_and_populate_table(
135+
'test_read_nested_proj', pa_schema,
136+
{'id': [1, 2],
137+
'payload': [{'a': 10, 'b': 'x'}, {'a': 20, 'b': 'y'}]},
138+
)
139+
140+
ds = read_paimon(identifier, self.catalog_options,
141+
projection=['id', 'payload.a'])
142+
rows = {r['id']: r for r in ds.take_all()}
143+
self.assertEqual(set(rows.keys()), {1, 2})
144+
self.assertEqual(rows[1]['payload_a'], 10)
145+
self.assertEqual(rows[2]['payload_a'], 20)
146+
122147
def test_read_paimon_with_filter(self):
123148
"""read_paimon() pushes down a predicate filter."""
124149
from pypaimon.ray import read_paimon

0 commit comments

Comments
 (0)