Skip to content

Commit a40d7a1

Browse files
committed
Replace Python generator protocol with Arrow C Stream Interface
Replace the per-batch Python generator protocol with native Arrow C Stream Interface for parquet input. Instead of calling back into Python for every RecordBatch (GIL acquisition, tuple unpacking, PyCapsule import per batch), Python now constructs RecordBatchReader objects that C++ imports via ArrowArrayStream and pulls batches natively. Key changes: - Python: Stream factory functions return RecordBatchReader objects backed by pyarrow.dataset scanners (parquet) or pa.ipc.open_stream (IPC), enabling GIL-free batch consumption in C++. - C++: New RecordBatchStreamSource abstract interface replaces Generator<RecordBatchWithFlag>. PyRecordBatchStreamSource imports readers via arrow::ImportRecordBatchReader(ArrowArrayStream*). - C++: getNextBatch() pulls from RecordBatchReader::ReadNext() directly, only calling back to Python at stream boundaries (file/directory). - C++: start() collects all needed columns before calling the factory, enabling column projection at the scanner level. - Schema changes detected by comparing schemas at stream boundaries. - Basket readers imported separately and pre-read into single batches. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
1 parent 827dc9e commit a40d7a1

10 files changed

Lines changed: 807 additions & 560 deletions

File tree

cpp/csp/adapters/arrow/RecordBatchRowProcessor.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ class RecordBatchRowProcessor
6464
// Get dispatcher for a column (nullptr if not found).
6565
ColumnDispatcher * getDispatcher( const std::string & name );
6666

67-
int64_t numRows() const { return m_numRows; }
68-
int64_t currentRow() const { return m_currentRow; }
6967
bool hasMoreRows() const { return m_currentRow < m_numRows; }
7068

7169
// Schema from last setupFromSchema call.

cpp/csp/adapters/parquet/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ set(PARQUET_HEADER_FILES
1212
ParquetOutputFilenameAdapter.h
1313
ParquetStatusUtils.h
1414
ParquetWriter.h
15-
RecordBatchWithFlag.h
15+
RecordBatchStreamSource.h
1616
)
1717

1818
set(PARQUET_SOURCE_FILES

cpp/csp/adapters/parquet/ParquetInputAdapterManager.cpp

Lines changed: 130 additions & 205 deletions
Large diffs are not rendered by default.

cpp/csp/adapters/parquet/ParquetInputAdapterManager.h

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33

44
#include <csp/adapters/arrow/ColumnDispatcher.h>
55
#include <csp/adapters/arrow/RecordBatchRowProcessor.h>
6-
#include <csp/adapters/parquet/RecordBatchWithFlag.h>
6+
#include <csp/adapters/parquet/RecordBatchStreamSource.h>
77
#include <csp/adapters/utils/StructAdapterInfo.h>
88
#include <csp/adapters/utils/ValueDispatcher.h>
9-
#include <csp/core/Generator.h>
109
#include <csp/engine/AdapterManager.h>
1110
#include <csp/engine/Dictionary.h>
1211
#include <csp/engine/Struct.h>
@@ -24,10 +23,10 @@ namespace csp::adapters::parquet
2423
class ParquetInputAdapterManager final : public csp::AdapterManager
2524
{
2625
public:
27-
using RecordBatchGeneratorPtr = csp::Generator<RecordBatchWithFlag, csp::DateTime, csp::DateTime>::Ptr;
26+
using RecordBatchStreamSourcePtr = RecordBatchStreamSource::Ptr;
2827

2928
ParquetInputAdapterManager( csp::Engine *engine, const Dictionary &properties,
30-
RecordBatchGeneratorPtr rbGeneratorPtr );
29+
RecordBatchStreamSourcePtr streamSource );
3130

3231
~ParquetInputAdapterManager();
3332

@@ -73,7 +72,6 @@ class ParquetInputAdapterManager final : public csp::AdapterManager
7372
DictionaryPtr m_fieldMap;
7473
std::vector<FieldSetter> m_fieldSetters;
7574
ValueDispatcher m_valueDispatcher;
76-
bool m_needsReset = false;
7775

7876
void createFieldSetters( arrow::RecordBatchRowProcessor & processor,
7977
const std::shared_ptr<::arrow::Schema> & schema );
@@ -84,9 +82,12 @@ class ParquetInputAdapterManager final : public csp::AdapterManager
8482
struct DictBasketReaderRecord
8583
{
8684
std::string m_basketName;
85+
std::string m_basketSymbolColumn;
8786
arrow::ColumnDispatcher * m_valueCountDispatcher = nullptr;
8887
std::unique_ptr<arrow::RecordBatchRowProcessor> m_processor;
8988
std::vector<std::unique_ptr<StructSubscription>> m_structSubscriptions;
89+
90+
uint16_t getValueCount() const;
9091
};
9192

9293
using AdaptersBySymbol = std::unordered_map<utils::Symbol, AdaptersSingleSymbol>;
@@ -96,6 +97,10 @@ class ParquetInputAdapterManager final : public csp::AdapterManager
9697
// Returns false when no more data is available.
9798
bool getNextBatch();
9899

100+
// Collect all column names referenced by adapters in the given map.
101+
static void collectAdapterColumns( const AdaptersBySymbol & adaptersBySymbol,
102+
std::set<std::string> & columns );
103+
99104
// Set up a processor from the current batch schema and subscribe adapters.
100105
void setupProcessor( arrow::RecordBatchRowProcessor & processor,
101106
const std::shared_ptr<::arrow::Schema> & schema,
@@ -106,7 +111,12 @@ class ParquetInputAdapterManager final : public csp::AdapterManager
106111
// Subscribe adapters to a processor's dispatchers.
107112
void subscribeAdapters( arrow::RecordBatchRowProcessor & processor,
108113
const AdaptersBySymbol & adaptersBySymbol,
109-
bool subscribeAllOnEmptySymbol );
114+
bool subscribeAllOnEmptySymbol,
115+
std::vector<std::unique_ptr<StructSubscription>> & structSubscriptions );
116+
117+
// Set up a dict basket processor: setupFromSchema + subscribe adapters + bind batch.
118+
void setupBasketProcessor( DictBasketReaderRecord & record,
119+
const AdaptersBySymbol & adaptersBySymbol );
110120

111121
// Read the next row into the main processor (handles batch boundaries).
112122
bool readNextRow();
@@ -139,7 +149,7 @@ class ParquetInputAdapterManager final : public csp::AdapterManager
139149
csp::DateTime m_startTime;
140150
csp::DateTime m_endTime;
141151
csp::TimeDelta m_time_shift;
142-
RecordBatchGeneratorPtr m_rbGenerator;
152+
RecordBatchStreamSourcePtr m_streamSource;
143153
std::string m_symbolColumn;
144154
std::string m_timeColumn;
145155
std::string m_defaultTimezone;
@@ -151,6 +161,7 @@ class ParquetInputAdapterManager final : public csp::AdapterManager
151161

152162
// Processor-based reader state
153163
std::unique_ptr<arrow::RecordBatchRowProcessor> m_processor;
164+
std::shared_ptr<::arrow::RecordBatchReader> m_mainReader;
154165
std::shared_ptr<::arrow::RecordBatch> m_curBatch;
155166
std::shared_ptr<::arrow::Schema> m_curSchema;
156167
std::unordered_map<std::string, std::shared_ptr<::arrow::RecordBatch>> m_curBasketBatches;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#ifndef _IN_CSP_ADAPTERS_PARQUET_RecordBatchStreamSource_H
2+
#define _IN_CSP_ADAPTERS_PARQUET_RecordBatchStreamSource_H
3+
4+
#include <arrow/record_batch.h>
5+
#include <csp/core/Time.h>
6+
#include <memory>
7+
#include <set>
8+
#include <string>
9+
#include <unordered_map>
10+
11+
namespace csp::adapters::parquet
12+
{
13+
14+
// Abstract interface for streaming RecordBatch data from Python.
15+
// The Python implementation wraps a "stream factory" callable that yields
16+
// (RecordBatchReader, {basket: RecordBatchReader}) tuples.
17+
// C++ imports readers via ArrowArrayStream and pulls batches natively.
18+
class RecordBatchStreamSource
19+
{
20+
public:
21+
using Ptr = std::shared_ptr<RecordBatchStreamSource>;
22+
23+
virtual ~RecordBatchStreamSource() = default;
24+
25+
// Call the stream factory with (starttime, endtime, needed_columns).
26+
// Must be called before nextStream().
27+
virtual void init( csp::DateTime start, csp::DateTime end,
28+
const std::set<std::string> & neededColumns ) = 0;
29+
30+
// Advance to the next stream set (file/directory boundary).
31+
// After this call, mainReader() and basketBatches() reflect the new data.
32+
// Returns false when all data is exhausted.
33+
virtual bool nextStream() = 0;
34+
35+
// The current main RecordBatchReader. Pull batches with ReadNext().
36+
// Returns nullptr before the first nextStream() call.
37+
virtual std::shared_ptr<::arrow::RecordBatchReader> mainReader() = 0;
38+
39+
// Pre-read basket batches for the current stream boundary.
40+
// Empty when there are no dict baskets.
41+
virtual const std::unordered_map<std::string, std::shared_ptr<::arrow::RecordBatch>> & basketBatches() const = 0;
42+
};
43+
44+
} // namespace csp::adapters::parquet
45+
46+
#endif

cpp/csp/adapters/parquet/RecordBatchWithFlag.h

Lines changed: 0 additions & 21 deletions
This file was deleted.

cpp/csp/python/adapters/parquetadapterimpl.cpp

Lines changed: 77 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
#include <csp/adapters/parquet/ParquetOutputAdapterManager.h>
44
#include <csp/adapters/parquet/ParquetDictBasketOutputWriter.h>
55
#include <csp/adapters/parquet/ParquetStatusUtils.h>
6-
#include <csp/core/Generator.h>
76
#include <csp/engine/PushInputAdapter.h>
87
#include <csp/python/Conversions.h>
98
#include <csp/python/Exception.h>
@@ -81,30 +80,50 @@ REGISTER_CPPNODE( csp::cppnodes, parquet_dict_basket_writer );
8180
namespace
8281
{
8382

84-
// Generator that wraps a Python generator yielding (RecordBatch, dict, bool) tuples.
85-
// The dict maps basket names to RecordBatches; the bool indicates schema change.
86-
class RecordBatchGenerator : public csp::Generator<RecordBatchWithFlag, csp::DateTime, csp::DateTime>
83+
// Generator that wraps a Python "stream factory" callable.
84+
// The factory signature: factory(starttime, endtime, needed_columns) -> iterator of (reader, basket_dict)
85+
// Each reader is a pyarrow.RecordBatchReader; basket_dict maps basket names to readers.
86+
// Readers are imported via ArrowArrayStream for GIL-free batch pulling in C++.
87+
class PyRecordBatchStreamSource : public csp::adapters::parquet::RecordBatchStreamSource
8788
{
8889
public:
89-
RecordBatchGenerator( PyObject *wrappedGenerator )
90-
: m_wrappedGenerator( csp::python::PyObjectPtr::incref( wrappedGenerator ) )
90+
PyRecordBatchStreamSource( PyObject *factory )
91+
: m_factory( csp::python::PyObjectPtr::incref( factory ) )
9192
{
9293
}
9394

94-
void init( csp::DateTime start, csp::DateTime end ) override
95+
void init( csp::DateTime start, csp::DateTime end,
96+
const std::set<std::string> & neededColumns ) override
9597
{
96-
auto tp = csp::python::PyObjectPtr::own( PyTuple_New( 2 ) );
98+
auto tp = csp::python::PyObjectPtr::own( PyTuple_New( 3 ) );
9799
if( !tp.get() )
98100
CSP_THROW( csp::python::PythonPassthrough, "" );
99101

100102
PyTuple_SET_ITEM( tp.get(), 0, csp::python::toPython( start ) );
101103
PyTuple_SET_ITEM( tp.get(), 1, csp::python::toPython( end ) );
102-
m_iter = csp::python::PyObjectPtr::check( PyObject_Call( m_wrappedGenerator.ptr(), tp.get(), nullptr ) );
104+
105+
if( neededColumns.empty() )
106+
{
107+
Py_INCREF( Py_None );
108+
PyTuple_SET_ITEM( tp.get(), 2, Py_None );
109+
}
110+
else
111+
{
112+
auto pyList = csp::python::PyObjectPtr::own( PyList_New( neededColumns.size() ) );
113+
if( !pyList.get() )
114+
CSP_THROW( csp::python::PythonPassthrough, "" );
115+
Py_ssize_t idx = 0;
116+
for( auto & col : neededColumns )
117+
PyList_SET_ITEM( pyList.get(), idx++, PyUnicode_FromStringAndSize( col.c_str(), col.size() ) );
118+
PyTuple_SET_ITEM( tp.get(), 2, pyList.release() );
119+
}
120+
121+
m_iter = csp::python::PyObjectPtr::check( PyObject_Call( m_factory.ptr(), tp.get(), nullptr ) );
103122
CSP_TRUE_OR_THROW( PyIter_Check( m_iter.ptr() ), csp::TypeError,
104-
"RecordBatch generator expected to return iterator" );
123+
"Stream factory expected to return iterator" );
105124
}
106125

107-
bool next( RecordBatchWithFlag &value ) override
126+
bool nextStream() override
108127
{
109128
if( m_iter.ptr() == nullptr )
110129
return false;
@@ -115,21 +134,18 @@ class RecordBatchGenerator : public csp::Generator<RecordBatchWithFlag, csp::Dat
115134
if( nextVal.get() == nullptr )
116135
return false;
117136

118-
// Expect a tuple of (RecordBatch, dict, bool)
119-
CSP_TRUE_OR_THROW( PyTuple_Check( nextVal.get() ) && PyTuple_GET_SIZE( nextVal.get() ) == 3,
120-
csp::TypeError, "RecordBatch generator expected to yield (batch, basket_batches, schema_changed) tuples" );
137+
// Expect a tuple of (RecordBatchReader, dict)
138+
CSP_TRUE_OR_THROW( PyTuple_Check( nextVal.get() ) && PyTuple_GET_SIZE( nextVal.get() ) == 2,
139+
csp::TypeError, "Stream factory expected to yield (reader, basket_dict) tuples" );
121140

122-
PyObject *pyBatch = PyTuple_GET_ITEM( nextVal.get(), 0 );
123-
PyObject *pyBasketDict = PyTuple_GET_ITEM( nextVal.get(), 1 );
124-
PyObject *pySchemaChanged = PyTuple_GET_ITEM( nextVal.get(), 2 );
141+
PyObject *pyReader = PyTuple_GET_ITEM( nextVal.get(), 0 );
142+
PyObject *pyBasketDict = PyTuple_GET_ITEM( nextVal.get(), 1 );
125143

126-
value.schemaChanged = PyObject_IsTrue( pySchemaChanged );
144+
// Import main reader via ArrowArrayStream
145+
m_mainReader = importRecordBatchReader( pyReader );
127146

128-
// Import main RecordBatch via PyCapsule C Data Interface
129-
value.batch = importRecordBatch( pyBatch );
130-
131-
// Import basket RecordBatches
132-
value.basketBatches.clear();
147+
// Import basket readers and read their single batch
148+
m_basketBatches.clear();
133149
if( PyDict_Check( pyBasketDict ) )
134150
{
135151
PyObject *key, *val;
@@ -139,43 +155,55 @@ class RecordBatchGenerator : public csp::Generator<RecordBatchWithFlag, csp::Dat
139155
const char *basketName = PyUnicode_AsUTF8( key );
140156
if( !basketName )
141157
CSP_THROW( csp::python::PythonPassthrough, "" );
142-
value.basketBatches[ basketName ] = importRecordBatch( val );
158+
159+
auto basketReader = importRecordBatchReader( val );
160+
std::shared_ptr<::arrow::RecordBatch> batch;
161+
auto status = basketReader -> ReadNext( &batch );
162+
if( !status.ok() )
163+
CSP_THROW( csp::ValueError, "Failed to read basket batch: " << status.ToString() );
164+
if( batch )
165+
m_basketBatches[ basketName ] = batch;
143166
}
144167
}
145168

146169
return true;
147170
}
148171

149-
private:
150-
static std::shared_ptr<::arrow::RecordBatch> importRecordBatch( PyObject *pyBatch )
172+
std::shared_ptr<::arrow::RecordBatchReader> mainReader() override
151173
{
152-
auto exportResult = csp::python::PyObjectPtr::own(
153-
PyObject_CallMethod( pyBatch, "__arrow_c_array__", nullptr ) );
154-
if( !exportResult.get() || PyErr_Occurred() )
155-
CSP_THROW( csp::python::PythonPassthrough, "" );
156-
157-
CSP_TRUE_OR_THROW( PyTuple_Check( exportResult.get() ) && PyTuple_GET_SIZE( exportResult.get() ) == 2,
158-
csp::TypeError, "__arrow_c_array__ expected to return (schema_capsule, array_capsule)" );
174+
return m_mainReader;
175+
}
159176

160-
PyObject *pySchemaCapsule = PyTuple_GET_ITEM( exportResult.get(), 0 );
161-
PyObject *pyArrayCapsule = PyTuple_GET_ITEM( exportResult.get(), 1 );
177+
const std::unordered_map<std::string, std::shared_ptr<::arrow::RecordBatch>> & basketBatches() const override
178+
{
179+
return m_basketBatches;
180+
}
162181

163-
auto *c_schema = reinterpret_cast<struct ArrowSchema *>( PyCapsule_GetPointer( pySchemaCapsule, "arrow_schema" ) );
164-
auto *c_array = reinterpret_cast<struct ArrowArray *>( PyCapsule_GetPointer( pyArrayCapsule, "arrow_array" ) );
182+
private:
183+
static std::shared_ptr<::arrow::RecordBatchReader> importRecordBatchReader( PyObject *pyReader )
184+
{
185+
// Call __arrow_c_stream__() to export as ArrowArrayStream PyCapsule
186+
auto capsule = csp::python::PyObjectPtr::own(
187+
PyObject_CallMethod( pyReader, "__arrow_c_stream__", nullptr ) );
188+
if( !capsule.get() || PyErr_Occurred() )
189+
CSP_THROW( csp::python::PythonPassthrough, "" );
165190

166-
auto schemaResult = arrow::ImportSchema( c_schema );
167-
if( !schemaResult.ok() )
168-
CSP_THROW( csp::ValueError, "Failed to import RecordBatch schema: " << schemaResult.status().ToString() );
191+
auto *stream = reinterpret_cast<struct ArrowArrayStream *>(
192+
PyCapsule_GetPointer( capsule.get(), "arrow_array_stream" ) );
193+
if( !stream )
194+
CSP_THROW( csp::ValueError, "Failed to get ArrowArrayStream from PyCapsule" );
169195

170-
auto batchResult = arrow::ImportRecordBatch( c_array, schemaResult.ValueUnsafe() );
171-
if( !batchResult.ok() )
172-
CSP_THROW( csp::ValueError, "Failed to import RecordBatch: " << batchResult.status().ToString() );
196+
auto result = ::arrow::ImportRecordBatchReader( stream );
197+
if( !result.ok() )
198+
CSP_THROW( csp::ValueError, "Failed to import RecordBatchReader: " << result.status().ToString() );
173199

174-
return std::move( batchResult.ValueUnsafe() );
200+
return result.ValueUnsafe();
175201
}
176202

177-
csp::python::PyObjectPtr m_wrappedGenerator;
203+
csp::python::PyObjectPtr m_factory;
178204
csp::python::PyObjectPtr m_iter;
205+
std::shared_ptr<::arrow::RecordBatchReader> m_mainReader;
206+
std::unordered_map<std::string, std::shared_ptr<::arrow::RecordBatch>> m_basketBatches;
179207
};
180208

181209
}
@@ -185,9 +213,9 @@ namespace csp::python
185213

186214
//AdapterManager
187215
csp::AdapterManager *create_parquet_input_adapter_manager_impl( PyEngine *engine, const Dictionary &properties,
188-
RecordBatchGenerator::Ptr rbGenerator )
216+
ParquetInputAdapterManager::RecordBatchStreamSourcePtr streamSource )
189217
{
190-
auto res = engine -> engine() -> createOwnedObject<ParquetInputAdapterManager>( properties, rbGenerator );
218+
auto res = engine -> engine() -> createOwnedObject<ParquetInputAdapterManager>( properties, streamSource );
191219
return res;
192220
}
193221

@@ -443,9 +471,9 @@ static PyObject *create_parquet_input_adapter_manager( PyObject *args )
443471
&PyFunction_Type, &pyFileGenerator ) )
444472
CSP_THROW( PythonPassthrough, "" );
445473

446-
auto rbGenerator = std::make_shared<RecordBatchGenerator>( pyFileGenerator );
474+
auto streamSource = std::make_shared<PyRecordBatchStreamSource>( pyFileGenerator );
447475
auto *adapterMgr = create_parquet_input_adapter_manager_impl( pyEngine, fromPython<Dictionary>( pyProperties ),
448-
rbGenerator );
476+
streamSource );
449477
auto res = PyCapsule_New( adapterMgr, "adapterMgr", nullptr );
450478
return res;
451479
CSP_RETURN_NULL;

0 commit comments

Comments
 (0)