Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions cpp/csp/adapters/arrow/ArrowTypeVisitor.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
// Arrow type visitor: maps arrow::Type::type to the corresponding C++ value type.
// Eliminates repeated switch statements on arrow types across the codebase.
//
// Usage:
// visitArrowValueType( typeId,
// [&]( auto tag ) { using T = typename decltype(tag)::type; ... },
// [&]() { /* unsupported */ } );
// [&]( auto tag ) -> ReturnType {
// using T = typename decltype( tag )::type;
// return doSomething<T>( ... );
// },
// [&]() -> ReturnType { /* unsupported type fallback */ } );

#ifndef _IN_CSP_ADAPTERS_ARROW_ArrowTypeVisitor_H
#define _IN_CSP_ADAPTERS_ARROW_ArrowTypeVisitor_H
Expand All @@ -20,12 +24,14 @@ namespace csp::adapters::arrow
template<typename T>
struct TypeTag { using type = T; };

// Invokes fn(TypeTag<CppType>{}) for the C++ type corresponding to the arrow type.
// Invokes fn( TypeTag<CppType>{} ) for the C++ value type corresponding to
// the given arrow type. Calls onDefault() for unrecognised arrow types.
template<typename Fn, typename DefaultFn>
decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, DefaultFn && onDefault )
{
switch( typeId )
{
// --- Numeric ---
case ::arrow::Type::BOOL: return fn( TypeTag<bool>{} );
case ::arrow::Type::INT8: return fn( TypeTag<int8_t>{} );
case ::arrow::Type::INT16: return fn( TypeTag<int16_t>{} );
Expand All @@ -39,6 +45,7 @@ decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, Defaul
case ::arrow::Type::FLOAT:
case ::arrow::Type::DOUBLE: return fn( TypeTag<double>{} );

// --- String / Binary ---
case ::arrow::Type::STRING:
case ::arrow::Type::LARGE_STRING:
case ::arrow::Type::BINARY:
Expand All @@ -47,13 +54,15 @@ decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, Defaul
case ::arrow::Type::DICTIONARY:
return fn( TypeTag<std::string>{} );

// --- Temporal ---
case ::arrow::Type::TIMESTAMP: return fn( TypeTag<DateTime>{} );
case ::arrow::Type::DURATION: return fn( TypeTag<TimeDelta>{} );
case ::arrow::Type::DATE32:
case ::arrow::Type::DATE64: return fn( TypeTag<Date>{} );
case ::arrow::Type::TIME32:
case ::arrow::Type::TIME64: return fn( TypeTag<Time>{} );

// --- List / Struct ---
case ::arrow::Type::LIST:
case ::arrow::Type::LARGE_LIST:
return fn( TypeTag<DialectGenericType>{} );
Expand All @@ -65,7 +74,7 @@ decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, Defaul
}
}

// Overload that throws TypeError for unrecognised types.
// Overload that throws TypeError for unrecognised arrow types.
template<typename Fn>
decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn )
{
Expand All @@ -76,6 +85,44 @@ decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn )
} );
}

// Maps CspType::Type → C++ value type via TypeTag.
// Calls fn(TypeTag<CppType>{}) for the matching type.
template<typename Fn, typename DefaultFn>
decltype(auto) visitCspValueType( CspType::Type cspType, Fn && fn, DefaultFn && onDefault )
{
switch( cspType )
{
case CspType::Type::BOOL: return fn( TypeTag<bool>{} );
case CspType::Type::INT8: return fn( TypeTag<int8_t>{} );
case CspType::Type::UINT8: return fn( TypeTag<uint8_t>{} );
case CspType::Type::INT16: return fn( TypeTag<int16_t>{} );
case CspType::Type::UINT16: return fn( TypeTag<uint16_t>{} );
case CspType::Type::INT32: return fn( TypeTag<int32_t>{} );
case CspType::Type::UINT32: return fn( TypeTag<uint32_t>{} );
case CspType::Type::INT64: return fn( TypeTag<int64_t>{} );
case CspType::Type::UINT64: return fn( TypeTag<uint64_t>{} );
case CspType::Type::DOUBLE: return fn( TypeTag<double>{} );
case CspType::Type::DATETIME: return fn( TypeTag<DateTime>{} );
case CspType::Type::TIMEDELTA: return fn( TypeTag<TimeDelta>{} );
case CspType::Type::DATE: return fn( TypeTag<Date>{} );
case CspType::Type::TIME: return fn( TypeTag<Time>{} );
case CspType::Type::STRING: return fn( TypeTag<std::string>{} );
case CspType::Type::ENUM: return fn( TypeTag<CspEnum>{} );
default: return onDefault();
}
}

// Convenience overload that throws on unhandled types.
template<typename Fn>
decltype(auto) visitCspValueType( CspType::Type cspType, Fn && fn )
{
return visitCspValueType( cspType, std::forward<Fn>( fn ),
[cspType]() -> decltype( fn( TypeTag<bool>{} ) )
{
CSP_THROW( TypeError, "Unhandled CspType in visitor: " << cspType );
} );
}

} // namespace csp::adapters::arrow

#endif // _IN_CSP_ADAPTERS_ARROW_ArrowTypeVisitor_H
#endif
152 changes: 152 additions & 0 deletions cpp/csp/adapters/parquet/ArrowBackedArrayBuilder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#include <csp/adapters/parquet/ArrowBackedArrayBuilder.h>
#include <csp/adapters/arrow/ArrowFieldWriter.h>
#include <csp/adapters/arrow/ArrowTypeVisitor.h>

namespace csp::adapters::parquet
{

static StructFieldPtr createStructFieldFromCspType( const std::string & name, CspTypePtr cspType )
{
if( cspType -> type() == CspType::Type::STRING )
return std::make_shared<StringStructField>( CspType::STRING(), name, false );
if( cspType -> type() == CspType::Type::ENUM )
return std::make_shared<CspEnumStructField>( cspType, name, false );

return csp::adapters::arrow::visitCspValueType( cspType -> type(),
[&]( auto tag ) -> StructFieldPtr
{
using T = typename decltype( tag )::type;
if constexpr( std::is_same_v<T, std::string> || std::is_same_v<T, CspEnum> )
return {}; // unreachable; STRING/ENUM handled above
else
return std::make_shared<NativeStructField<T>>( name, false );
},
[&]() -> StructFieldPtr
{
CSP_THROW( TypeError, "Unsupported CSP type for struct field: " << cspType -> type() );
} );
}

ScratchFieldInfo createScratchField( const std::string & name, CspTypePtr cspType )
{
auto field = createStructFieldFromCspType( name, cspType );

auto meta = std::make_shared<StructMeta>(
"__scratch_" + name,
StructMeta::Fields{ field },
false
);

return ScratchFieldInfo{ meta, meta -> field( name ) };
}

ArrowBackedArrayBuilder::ArrowBackedArrayBuilder(
const std::string & columnName, std::uint32_t chunkSize,
CspTypePtr cspType, bool isBytes )
: ArrowSingleColumnArrayBuilder( columnName, chunkSize )
, m_isScratchMode( true )
{
// For bytes fields, we need to create a STRING type scratch field
// but the FieldWriter should produce binary output.
// The createScratchField always creates STRING type fields for STRING cspType.
auto scratch = createScratchField( columnName, cspType );
m_scratchMeta = scratch.meta;
m_field = scratch.field;
m_scratch = m_scratchMeta -> create();

// If isBytes, wrap the STRING field so FieldWriter produces binary
if( isBytes )
{
auto bytesField = std::make_shared<StringStructField>(
std::make_shared<CspStringType>( true ), columnName, false );
auto bytesMeta = std::make_shared<StructMeta>(
"__scratch_bytes_" + columnName,
StructMeta::Fields{ bytesField },
false
);
m_scratchMeta = bytesMeta;
m_field = bytesMeta -> field( columnName );
m_scratch = m_scratchMeta -> create();
}

init( columnName, m_field );
}

ArrowBackedArrayBuilder::ArrowBackedArrayBuilder(
const std::string & columnName, std::uint32_t chunkSize,
const StructFieldPtr & structField )
: ArrowSingleColumnArrayBuilder( columnName, chunkSize )
, m_field( structField )
, m_isScratchMode( false )
{
init( columnName, structField );
}

ArrowBackedArrayBuilder::~ArrowBackedArrayBuilder() = default;

void ArrowBackedArrayBuilder::init( const std::string & columnName, const StructFieldPtr & field )
{
auto created = csp::adapters::arrow::createFieldWriter( columnName, field );
m_writer = std::move( created.writer );
}

std::shared_ptr<::arrow::DataType> ArrowBackedArrayBuilder::getDataType()
{
return m_writer -> dataTypes()[0];
}

int64_t ArrowBackedArrayBuilder::length() const
{
return m_writer -> builder() -> length();
}

void ArrowBackedArrayBuilder::handleRowFinished()
{
if( m_isScratchMode )
{
// Scratch mode: FieldWriter reads from scratch struct.
// writeNext checks isSet internally and appends value or null.
m_writer -> writeNext( m_scratch.get() );

// Clear the isSet bit so next cycle starts with null
m_field -> clearIsSet( m_scratch.get() );
}
else
{
// External mode: read from externally-provided struct
if( m_hasExternalValue )
{
m_writer -> writeNext( m_externalStruct );
m_hasExternalValue = false;
m_externalStruct = nullptr;
}
else
{
m_writer -> writeNull();
}
}
}

std::shared_ptr<::arrow::Array> ArrowBackedArrayBuilder::buildArray()
{
auto arrays = m_writer -> finish();
CSP_TRUE_OR_THROW_RUNTIME( arrays.size() == 1,
"ArrowBackedArrayBuilder expected 1 array from FieldWriter, got " << arrays.size() );
return arrays[0];
}

std::shared_ptr<ArrowBackedArrayBuilder> createArrowBackedArrayBuilder(
const std::string & columnName, std::uint32_t chunkSize,
CspTypePtr cspType, bool isBytes )
{
return std::make_shared<ArrowBackedArrayBuilder>( columnName, chunkSize, cspType, isBytes );
}

std::shared_ptr<ArrowBackedArrayBuilder> createArrowBackedArrayBuilderForField(
const std::string & columnName, std::uint32_t chunkSize,
const StructFieldPtr & structField )
{
return std::make_shared<ArrowBackedArrayBuilder>( columnName, chunkSize, structField );
}

} // namespace csp::adapters::parquet
85 changes: 85 additions & 0 deletions cpp/csp/adapters/parquet/ArrowBackedArrayBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#ifndef _IN_CSP_ADAPTERS_PARQUET_ArrowBackedArrayBuilder_H
#define _IN_CSP_ADAPTERS_PARQUET_ArrowBackedArrayBuilder_H

#include <csp/adapters/parquet/ArrowSingleColumnArrayBuilder.h>

// Forward declare FieldWriter (defined in csp/adapters/arrow/ArrowFieldWriter.h) to keep this header light.
namespace csp::adapters::arrow { class FieldWriter; }

namespace csp::adapters::parquet
{

// Factory: create a scratch StructMeta + field for a given CSP type.
struct ScratchFieldInfo
{
std::shared_ptr<StructMeta> meta;
StructFieldPtr field;
};

ScratchFieldInfo createScratchField( const std::string & name, CspTypePtr cspType );

// Wraps an ArrowFieldWriter inside the ArrowSingleColumnArrayBuilder interface.
// Two modes:
// Scratch mode: owns a single-field struct, caller writes value, handleRowFinished appends.
// External mode: caller provides a Struct*, handleRowFinished reads field and appends.
class ArrowBackedArrayBuilder : public ArrowSingleColumnArrayBuilder
{
public:
// Scratch mode: creates FieldWriter + scratch struct from a CspType.
// Used by SingleColumnParquetOutputHandler.
ArrowBackedArrayBuilder( const std::string & columnName, std::uint32_t chunkSize,
CspTypePtr cspType, bool isBytes = false );

// External mode: creates FieldWriter from an existing StructFieldPtr.
// Used by StructParquetOutputHandler (reads from source struct directly).
ArrowBackedArrayBuilder( const std::string & columnName, std::uint32_t chunkSize,
const StructFieldPtr & structField );

~ArrowBackedArrayBuilder() override;

// --- ArrowSingleColumnArrayBuilder interface ---
std::shared_ptr<::arrow::DataType> getDataType() override;
int64_t length() const override;
void handleRowFinished() override;
std::shared_ptr<::arrow::Array> buildArray() override;

// --- Scratch mode API ---
// Get the scratch struct for value setting
Struct * scratch() { return m_scratch.get(); }
const StructFieldPtr & scratchField() const { return m_field; }

// --- External mode API ---
// Set the source struct for this row (marks value available)
void setStruct( const Struct * s )
{
m_externalStruct = s;
m_hasExternalValue = true;
}

private:
void init( const std::string & columnName, const StructFieldPtr & field );

std::unique_ptr<csp::adapters::arrow::FieldWriter> m_writer;
std::shared_ptr<StructMeta> m_scratchMeta; // non-null in scratch mode
StructPtr m_scratch; // non-null in scratch mode
StructFieldPtr m_field;

// External mode state
const Struct * m_externalStruct = nullptr;
bool m_hasExternalValue = false;
bool m_isScratchMode = false;
};

// Factory: create ArrowBackedArrayBuilder for a given CspType (scratch mode).
std::shared_ptr<ArrowBackedArrayBuilder> createArrowBackedArrayBuilder(
const std::string & columnName, std::uint32_t chunkSize,
CspTypePtr cspType, bool isBytes = false );

// Factory: create ArrowBackedArrayBuilder for a struct field (external mode).
std::shared_ptr<ArrowBackedArrayBuilder> createArrowBackedArrayBuilderForField(
const std::string & columnName, std::uint32_t chunkSize,
const StructFieldPtr & structField );

} // namespace csp::adapters::parquet

#endif
49 changes: 0 additions & 49 deletions cpp/csp/adapters/parquet/ArrowIPCFileWriterWrapper.cpp

This file was deleted.

Loading
Loading