Skip to content

Commit c1c9f3e

Browse files
authored
Rewrite the parquet input adapter manager (#704)
* Arrow row-by-row processing: ColumnDispatcher, RecordBatchRowProcessor Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Remove old C++ reader classes and file wrappers Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Rewrite ParquetInputAdapterManager for RecordBatch input Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Python adapter: RecordBatch stream factories and C Stream Interface Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Tests for parquet input adapter rewrite Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Qualify arrow:: as ::arrow:: in writer headers to avoid namespace ambiguity The introduction of namespace csp::adapters::arrow (for the new ColumnDispatcher/RecordBatchRowProcessor classes) creates ambiguity when writer-side headers use unqualified arrow:: inside namespace csp::adapters::parquet. The compiler finds the sibling csp::adapters::arrow namespace before the global ::arrow namespace. Also forward-declares ColumnDispatcher and RecordBatchRowProcessor in ParquetInputAdapterManager.h (moving full includes to .cpp) and adds direct includes for csp/core/Exception.h and arrow/table.h that were previously provided transitively through the now-deleted reader headers. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Fix Arrow 21 compatibility: use out-parameter FileReader::Make Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Add comprehensive test coverage for parquet input adapter Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Remove dead m_rbSources member from DictBasketReaderRecord, and format Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Optimize hot path: InlineReader for zero-overhead value extraction Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Add comprehensive tests for all Arrow types and edge cases Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Address review: remove dead code, hoist loop invariant, deduplicate string lambdas - Remove unused m_basketSymbolColumn member - Remove dead properties.get line before CSP_THROW - Hoist phase variable out of loop - Use generic lambda for string/binary extraction - Leave timeUnitMultiplier inline because constexpr fails with CSP_THROW under this C++20 build Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Address review: private SourceEntry, const fields, deduplicate batch logic, hoist columns - Make SourceEntry private in RecordBatchRowProcessor - Make m_arrowTypeId and m_columnName const - Replace duplicated first-batch loop with fetchNextBatch call - Hoist columns() out of rebindSource loop Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Address review: static lambda, remove tz member, remove redundant override - Make viewToString lambda static in createColumnDispatcher - Remove m_defaultTimezone member, validate tz inline and discard - Remove redundant doReadNextValue override from LambdaReader (base class handles it) Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * perf: add prefetch and parallel column decode for parquet reading Add PrefetchingRecordBatchReader that decodes the next RecordBatch on a background thread while CSP processes the current batch. Also enable Arrow's use_threads and pre_buffer for parallel column decoding and IO range caching. The PrefetchingRecordBatchReader co-owns the FileReader (via shared_ptr) to guarantee the FileReader outlives the background prefetch thread, even when CSP stops mid-file. Benchmarks show ~15% average speedup, up to 1.5x on filtered reads and wide structs, with no regressions. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Address comments Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Address review: fix bugs, remove dead code, clean up test scaffolding Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Clean up tests Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Replace PrefetchingRecordBatchReader with async generator API Redesign the batch-reading interface to use Arrow's async generator (GetRecordBatchGenerator) natively rather than wrapping it in a synchronous RecordBatchReader subclass. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Fix allow_missing_files not honored in split-columns native path Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> --------- Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
1 parent 3f98441 commit c1c9f3e

37 files changed

Lines changed: 5823 additions & 3573 deletions

CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ option(CSP_ENABLE_UBSAN "Build with undefined behavior sanitizer" OFF)
7474
option(CSP_BUILD_KAFKA_ADAPTER "Build kafka adapter" ON)
7575
option(CSP_BUILD_ARROW_ADAPTER "Build arrow adapter" ON)
7676
option(CSP_BUILD_PARQUET_ADAPTER "Build parquet adapter" ON)
77+
78+
# Parquet adapter depends on arrow adapter
79+
if(CSP_BUILD_PARQUET_ADAPTER AND NOT CSP_BUILD_ARROW_ADAPTER)
80+
message(STATUS "Enabling arrow adapter (required by parquet adapter)")
81+
set(CSP_BUILD_ARROW_ADAPTER ON CACHE BOOL "Build arrow adapter" FORCE)
82+
endif()
7783
option(CSP_BUILD_WS_CLIENT_ADAPTER "Build ws client adapter" ON)
7884

7985
# Normalize build type for downstream comparisons

cpp/cmake/modules/FindDepsParquetAdapter.cmake

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,29 @@
11
cmake_minimum_required(VERSION 3.7.2)
22

3-
# ARROW
4-
find_package(Arrow REQUIRED)
5-
include_directories(${ARROW_INCLUDE_DIR})
3+
# ARROW (reuse FindDepsArrowAdapter for find_package + link target resolution)
4+
find_package(DepsArrowAdapter REQUIRED)
65

76
# PARQUET
87
find_package(Parquet REQUIRED)
98
include_directories(${PARQUET_INCLUDE_DIR})
109

10+
# Resolve Parquet link targets based on platform and vcpkg configuration.
11+
# Sets CSP_PARQUET_LINK_LIBS for use in target_link_libraries().
12+
if(WIN32)
13+
if(CSP_USE_VCPKG)
14+
set(CSP_PARQUET_LINK_LIBS Parquet::parquet_static)
15+
add_compile_definitions(PARQUET_STATIC)
16+
else()
17+
set(CSP_PARQUET_LINK_LIBS parquet_shared)
18+
endif()
19+
else()
20+
if(CSP_USE_VCPKG)
21+
set(CSP_PARQUET_LINK_LIBS parquet_static)
22+
else()
23+
set(CSP_PARQUET_LINK_LIBS parquet)
24+
endif()
25+
endif()
26+
1127
# Other deps
1228
find_package(Thrift REQUIRED)
1329
find_package(Brotli REQUIRED)

0 commit comments

Comments
 (0)