Skip to content

Commit 83a96fc

Browse files
authored
Add nodes to convert between csp.Structs and Arrow RecordBatches (#698)
* Arrow foundation: type visitor, field reader/writer, and CMake Add core Arrow adapter library with: - ArrowTypeVisitor: visitArrowValueType maps arrow::Type to C++ types - ArrowFieldReader: per-column value extraction from Arrow arrays - ArrowFieldWriter: per-column value serialization to Arrow builders - CMake build for csp_arrow_adapter static library - Centralize Arrow link-target resolution in FindDepsArrowAdapter Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Arrow batch conversion: RecordBatch ↔ Struct Add RecordBatchToStruct and StructToRecordBatch converters for bulk-converting between Arrow RecordBatches and csp::Struct vectors. Support pluggable custom field readers/writers for extension types. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Arrow Python bindings for batch adapter (scalar types) Add ArrowCppNodes with record_batches_to_struct and struct_to_record_batches CppNode implementations. Make RecordBatchPullInputAdapter schema optional (lazy extraction from first batch). Register graph functions in arrow.py. Simplify arrowadapterimpl build to link against csp_arrow_adapter. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Tests for Arrow batch adapter scalar type conversion Add tests for record_batches_to_struct and struct_to_record_batches covering all scalar types, nested structs, schema validation, and round-trip conversion. Add RecordBatch pull input adapter tests. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> * Address review feedback on CppNodes and arrow.py - Remove unnecessary ticked() checks on single-input nodes - Remove extra blank line in START() - Use PyList_GET_ITEM for faster unchecked access in loop - Use typed TS_INPUT(vector<StructPtr>) instead of Generic for struct_to_record_batches - Use standard @csp.node decorator instead of _node_internal_use Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com> --------- Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
1 parent f33f795 commit 83a96fc

18 files changed

Lines changed: 2979 additions & 28 deletions

cpp/cmake/modules/FindDepsArrowAdapter.cmake

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,25 @@ cmake_minimum_required(VERSION 3.7.2)
33
# ARROW
44
find_package(Arrow REQUIRED)
55
include_directories(${ARROW_INCLUDE_DIR})
6+
7+
# Resolve Arrow link targets based on platform and vcpkg configuration.
8+
# Sets CSP_ARROW_LINK_LIBS for use in target_link_libraries().
9+
# On Windows with vcpkg, also applies the ws2_32.dll fix and defines ARROW_STATIC.
10+
if(WIN32)
11+
if(CSP_USE_VCPKG)
12+
set(CSP_ARROW_LINK_LIBS Arrow::arrow_static)
13+
add_compile_definitions(ARROW_STATIC)
14+
else()
15+
# Until we manage to get the fix for ws2_32.dll in arrow-16 into conda, manually fix the error here
16+
get_target_property(LINK_LIBS Arrow::arrow_shared INTERFACE_LINK_LIBRARIES)
17+
string(REPLACE "ws2_32.dll" "ws2_32" FIXED_LINK_LIBS "${LINK_LIBS}")
18+
set_target_properties(Arrow::arrow_shared PROPERTIES INTERFACE_LINK_LIBRARIES "${FIXED_LINK_LIBS}")
19+
set(CSP_ARROW_LINK_LIBS arrow_shared)
20+
endif()
21+
else()
22+
if(CSP_USE_VCPKG)
23+
set(CSP_ARROW_LINK_LIBS arrow_static)
24+
else()
25+
set(CSP_ARROW_LINK_LIBS arrow)
26+
endif()
27+
endif()

cpp/csp/adapters/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ if(CSP_BUILD_KAFKA_ADAPTER)
33
add_subdirectory(kafka)
44
endif()
55

6+
if(CSP_BUILD_ARROW_ADAPTER)
7+
add_subdirectory(arrow)
8+
endif()
9+
610
if(CSP_BUILD_PARQUET_ADAPTER)
711
add_subdirectory(parquet)
812
endif()

0 commit comments

Comments
 (0)