Skip to content

Commit 662da22

Browse files
committed
perf: add prefetch and parallel column decode for parquet reading
Add PrefetchingRecordBatchReader that decodes the next RecordBatch on a background thread while CSP processes the current batch. Also enable Arrow's use_threads and pre_buffer for parallel column decoding and IO range caching. The PrefetchingRecordBatchReader co-owns the FileReader (via shared_ptr) to guarantee the FileReader outlives the background prefetch thread, even when CSP stops mid-file. Benchmarks show ~15% average speedup, up to 1.5x on filtered reads and wide structs, with no regressions. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
1 parent c72a9d8 commit 662da22

2 files changed

Lines changed: 120 additions & 8 deletions

File tree

cpp/csp/python/adapters/parquetadapterimpl.cpp

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
#include <arrow/io/file.h>
2424
#include <parquet/arrow/reader.h>
2525
#include <parquet/file_reader.h>
26+
#include <parquet/properties.h>
2627
#include <csp/engine/PartialSwitchCspType.h>
2728
#include <filesystem>
29+
#include <future>
2830
#include <locale>
2931
#include <codecvt>
3032
#include <numeric>
@@ -190,6 +192,75 @@ class PyRecordBatchStreamSource : public csp::adapters::parquet::RecordBatchStre
190192
ColumnReaderMap m_columnReaders;
191193
};
192194

195+
// Wraps a RecordBatchReader to prefetch the next batch on a background thread.
196+
// This overlaps Arrow decode (ReadNext) with CSP's per-row processing.
197+
class PrefetchingRecordBatchReader : public ::arrow::RecordBatchReader
198+
{
199+
public:
200+
PrefetchingRecordBatchReader( std::shared_ptr<::arrow::RecordBatchReader> inner,
201+
std::shared_ptr<::parquet::arrow::FileReader> fileReader )
202+
: m_inner( std::move( inner ) ), m_fileReader( std::move( fileReader ) ), m_eof( false )
203+
{
204+
// Kick off the first prefetch
205+
m_prefetch = std::async( std::launch::async, [this] { return readOne(); } );
206+
}
207+
208+
~PrefetchingRecordBatchReader() override
209+
{
210+
// Ensure the background task finishes before m_inner/m_fileReader are released
211+
if( m_prefetch.valid() )
212+
m_prefetch.wait();
213+
}
214+
215+
std::shared_ptr<::arrow::Schema> schema() const override
216+
{
217+
return m_inner -> schema();
218+
}
219+
220+
::arrow::Status ReadNext( std::shared_ptr<::arrow::RecordBatch> * batch ) override
221+
{
222+
if( m_eof )
223+
{
224+
*batch = nullptr;
225+
return ::arrow::Status::OK();
226+
}
227+
228+
// Get the prefetched result
229+
auto result = m_prefetch.get();
230+
if( !result.ok() )
231+
return result.status();
232+
233+
*batch = result.MoveValueUnsafe();
234+
235+
if( *batch == nullptr )
236+
{
237+
m_eof = true;
238+
}
239+
else
240+
{
241+
// Start prefetching the next batch
242+
m_prefetch = std::async( std::launch::async, [this] { return readOne(); } );
243+
}
244+
245+
return ::arrow::Status::OK();
246+
}
247+
248+
private:
249+
::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> readOne()
250+
{
251+
std::shared_ptr<::arrow::RecordBatch> batch;
252+
auto status = m_inner -> ReadNext( &batch );
253+
if( !status.ok() )
254+
return status;
255+
return batch;
256+
}
257+
258+
std::shared_ptr<::arrow::RecordBatchReader> m_inner;
259+
std::shared_ptr<::parquet::arrow::FileReader> m_fileReader;
260+
std::future<::arrow::Result<std::shared_ptr<::arrow::RecordBatch>>> m_prefetch;
261+
bool m_eof;
262+
};
263+
193264
// Native C++ parquet reader — opens parquet files directly, bypassing Python.
194265
// Used for regular and split-column parquet. IPC/memory tables use PyRecordBatchStreamSource.
195266
class NativeParquetStreamSource : public csp::adapters::parquet::RecordBatchStreamSource
@@ -374,25 +445,29 @@ class NativeParquetStreamSource : public csp::adapters::parquet::RecordBatchStre
374445
return !m_columnReaders.empty();
375446
}
376447

377-
static std::unique_ptr<::parquet::arrow::FileReader> makeFileReader( const std::string & path )
448+
static std::shared_ptr<::parquet::arrow::FileReader> makeFileReader( const std::string & path )
378449
{
379450
auto fileResult = ::arrow::io::ReadableFile::Open( path );
380451
if( !fileResult.ok() )
381452
CSP_THROW( csp::ValueError, "Failed to open " << path << ": " << fileResult.status().ToString() );
382453

383454
auto parquetReader = ::parquet::ParquetFileReader::Open( fileResult.ValueUnsafe() );
384455

456+
::parquet::ArrowReaderProperties arrowProps;
457+
arrowProps.set_use_threads( true );
458+
arrowProps.set_pre_buffer( true );
459+
385460
std::unique_ptr<::parquet::arrow::FileReader> fileReader;
386461
auto status = ::parquet::arrow::FileReader::Make(
387-
::arrow::default_memory_pool(), std::move( parquetReader ), &fileReader );
462+
::arrow::default_memory_pool(), std::move( parquetReader ), arrowProps, &fileReader );
388463
if( !status.ok() )
389464
CSP_THROW( csp::ValueError, "Failed to create Arrow FileReader for " << path << ": " << status.ToString() );
390465

391466
return fileReader;
392467
}
393468

394469
static std::shared_ptr<::arrow::RecordBatchReader> getRecordBatchReader(
395-
const std::unique_ptr<::parquet::arrow::FileReader> & fileReader,
470+
const std::shared_ptr<::parquet::arrow::FileReader> & fileReader,
396471
const std::vector<int> & colIndices )
397472
{
398473
int numRG = fileReader -> num_row_groups();
@@ -408,8 +483,9 @@ class NativeParquetStreamSource : public csp::adapters::parquet::RecordBatchStre
408483
if( !result.ok() )
409484
CSP_THROW( csp::ValueError, "GetRecordBatchReader failed: " << result.status().ToString() );
410485

411-
// Convert unique_ptr → shared_ptr
412-
return std::shared_ptr<::arrow::RecordBatchReader>( std::move( result ).ValueUnsafe() );
486+
// Wrap in prefetching reader; it co-owns the FileReader to keep it alive
487+
auto inner = std::shared_ptr<::arrow::RecordBatchReader>( std::move( result ).ValueUnsafe() );
488+
return std::make_shared<PrefetchingRecordBatchReader>( std::move( inner ), fileReader );
413489
}
414490

415491
csp::python::PyObjectPtr m_filenameGen;
@@ -419,9 +495,8 @@ class NativeParquetStreamSource : public csp::adapters::parquet::RecordBatchStre
419495
std::vector<std::string> m_filenames;
420496
size_t m_fileIdx = 0;
421497
ColumnReaderMap m_columnReaders;
422-
// FileReaders must outlive their RecordBatchReaders
423-
std::vector<std::unique_ptr<::parquet::arrow::FileReader>> m_fileReaders;
424-
std::vector<std::unique_ptr<::parquet::arrow::FileReader>> m_prevFileReaders;
498+
std::vector<std::shared_ptr<::parquet::arrow::FileReader>> m_fileReaders;
499+
std::vector<std::shared_ptr<::parquet::arrow::FileReader>> m_prevFileReaders;
425500
};
426501

427502
}

csp/tests/adapters/test_parquet.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5727,6 +5727,43 @@ def g(file_names: object) -> csp.ts[int]:
57275727
got = [v[1] for v in result[0]]
57285728
self.assertEqual(got, [10, 20, 30, 40])
57295729

5730+
def test_partial_read_many_row_groups(self):
5731+
"""Partial read with many row groups exercises prefetch thread shutdown.
5732+
5733+
Regression test: when CSP stops mid-file, the PrefetchingRecordBatchReader
5734+
must cleanly shut down its background thread before the FileReader is released.
5735+
"""
5736+
start = datetime(2020, 1, 1)
5737+
n_rows = 50_000
5738+
row_group_size = 100 # 500 row groups
5739+
5740+
with tempfile.TemporaryDirectory(prefix="csp_unit_tests") as d:
5741+
path = os.path.join(d, "many_rg.parquet")
5742+
timestamps = [start + timedelta(seconds=i) for i in range(1, n_rows + 1)]
5743+
table = pyarrow.table(
5744+
{
5745+
"csp_timestamp": pyarrow.array(timestamps, type=pyarrow.timestamp("ns", tz="UTC")),
5746+
"value": pyarrow.array(range(n_rows), type=pyarrow.int64()),
5747+
}
5748+
)
5749+
pyarrow.parquet.write_table(table, path, row_group_size=row_group_size)
5750+
5751+
@csp.graph
5752+
def g() -> csp.ts[int]:
5753+
reader = ParquetReader(path, time_column="csp_timestamp")
5754+
return reader.subscribe_all(int, "value")
5755+
5756+
# Partial read: only consume ~17% of the file (first day of ~0.6 days worth)
5757+
end = start + timedelta(hours=2)
5758+
5759+
# Run multiple times to exercise the race condition
5760+
for _ in range(10):
5761+
result = csp.run(g, starttime=start, endtime=end)
5762+
ticks = [v[1] for v in result[0]]
5763+
self.assertEqual(len(ticks), 7200)
5764+
self.assertEqual(ticks[0], 0)
5765+
self.assertEqual(ticks[-1], 7199)
5766+
57305767

57315768
if __name__ == "__main__":
57325769
unittest.main()

0 commit comments

Comments
 (0)