Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cpp-ch/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ else()
endif()
endif()

set(CH_PATCH_FILES
${CMAKE_CURRENT_SOURCE_DIR}/patches/fix-orc-missing-nullable-complex-columns.patch)
foreach(CH_PATCH_FILE IN LISTS CH_PATCH_FILES)
execute_process(
COMMAND git apply --reverse --check ${CH_PATCH_FILE}
WORKING_DIRECTORY ${CH_SOURCE_DIR}
RESULT_VARIABLE CH_PATCH_REVERSE_CHECK_RESULT
OUTPUT_QUIET ERROR_QUIET)
if(NOT CH_PATCH_REVERSE_CHECK_RESULT EQUAL 0)
execute_process(
COMMAND git apply --check ${CH_PATCH_FILE}
WORKING_DIRECTORY ${CH_SOURCE_DIR} COMMAND_ERROR_IS_FATAL ANY)
execute_process(
COMMAND git apply ${CH_PATCH_FILE}
WORKING_DIRECTORY ${CH_SOURCE_DIR} COMMAND_ERROR_IS_FATAL ANY)
endif()
endforeach()

if(EXISTS "${CH_SOURCE_DIR}/utils/extern-local-engine")
execute_process(COMMAND rm -rf ${CH_SOURCE_DIR}/utils/extern-local-engine)
endif()
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20260425-25.12.10.7
CH_COMMIT=54c5bf9a97b
CH_BRANCH=rebase_ch/20260425-25.12.10.7-complextype
CH_COMMIT=b1510a2
11 changes: 11 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ BlockUtil::convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, co
DB::JoinCommon::convertColumnToNullable(nullable_column);
return {nullable_column.column, sample_column.type, sample_column.name};
}
else if (!sample_column.type->isNullable() && column.type->isNullable() && sample_column.type->equals(*DB::removeNullable(column.type)))
{
const auto * nullable_column = DB::checkAndGetColumn<DB::ColumnNullable>(column.column.get());
if (!nullable_column || std::ranges::any_of(nullable_column->getNullMapData(), [](UInt8 is_null) { return is_null != 0; }))
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"Cannot convert nullable column with NULL values to non-nullable type. original:{} expected:{}",
column.dumpStructure(),
sample_column.dumpStructure());
return {nullable_column->getNestedColumnPtr(), sample_column.type, sample_column.name};
}
else
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
Expand Down
5 changes: 2 additions & 3 deletions cpp-ch/local-engine/Functions/SparkFunctionTupleElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class SparkFunctionTupleElement : public IFunction
Columns null_maps;
while (const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(removeNullable(input_type).get()))
{
const ColumnNullable * nullable_array_col = input_type->isNullable() ? checkAndGetColumn<ColumnNullable>(input_col) : nullptr;
const ColumnNullable * nullable_array_col = checkAndGetColumn<ColumnNullable>(input_col);
const ColumnArray * array_col = nullable_array_col ? checkAndGetColumn<ColumnArray>(&nullable_array_col->getNestedColumn())
: checkAndGetColumn<ColumnArray>(input_col);

Expand All @@ -147,8 +147,7 @@ class SparkFunctionTupleElement : public IFunction
getName(),
input_arg.type->getName());

const ColumnNullable * input_col_as_nullable_tuple
= input_type->isNullable() ? checkAndGetColumn<ColumnNullable>(input_col) : nullptr;
const ColumnNullable * input_col_as_nullable_tuple = checkAndGetColumn<ColumnNullable>(input_col);
const ColumnTuple * input_col_as_tuple = input_col_as_nullable_tuple
? checkAndGetColumn<ColumnTuple>(&input_col_as_nullable_tuple->getNestedColumn())
: checkAndGetColumn<ColumnTuple>(input_col);
Expand Down
26 changes: 22 additions & 4 deletions cpp-ch/local-engine/Parser/ExpressionParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,24 @@ ExpressionParser::addConstColumn(DB::ActionsDAG & actions_dag, const DB::DataTyp
{
String name = DB::fieldToString(field).substr(0, 10);
name = getUniqueName(name);
const auto * res_node = &actions_dag.addColumn(DB::ColumnWithTypeAndName(type->createColumnConst(1, field), type, name));
/// Substrait null literals carry a concrete type; CH `createColumnConst` inserts into the nested column (e.g. ColumnArray),
/// which cannot accept `Field::Null` unless the type is Nullable(...).
DB::DataTypePtr const_type = type;
DB::Field const_field = field;
if (field.isNull() && !type->isNullable())
{
// Some complex types intentionally drop top-level nullability in CH representation, e.g. LIST -> Array(...).
// Keep the literal consistent with the non-nullable CH type so later tuple casts do not try to remove a real NULL.
const auto type_without_nullable = DB::removeNullable(type);
if (typeid_cast<const DB::DataTypeArray *>(type_without_nullable.get())
|| typeid_cast<const DB::DataTypeMap *>(type_without_nullable.get())
|| typeid_cast<const DB::DataTypeTuple *>(type_without_nullable.get()))
const_field = type->getDefault();
else
const_type = makeNullable(type);
}
const auto * res_node = &actions_dag.addColumn(
DB::ColumnWithTypeAndName(const_type->createColumnConst(1, const_field), const_type, name));
if (reuseCSE())
{
// The new node, res_node will be remained in the ActionsDAG, but it will not affect the execution.
Expand Down Expand Up @@ -307,6 +324,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
const auto & input_type = args[0]->result_type;
DataTypePtr denull_input_type = removeNullable(input_type);
DataTypePtr output_type = TypeParser::parseType(substrait_type);
DataTypePtr cast_output_type = input_type->isNullable() && !output_type->isNullable() ? makeNullable(output_type) : output_type;
DataTypePtr denull_output_type = removeNullable(output_type);
const ActionsDAG::Node * result_node = nullptr;
if (substrait_type.has_binary())
Expand Down Expand Up @@ -351,7 +369,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
else if (isString(denull_input_type) && substrait_type.has_bool_())
{
/// cast(string to boolean)
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "accurateCastOrNull", args);
}
else if (isString(denull_input_type) && isInt(denull_output_type))
Expand All @@ -360,13 +378,13 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
/// Refer to https://github.com/apache/gluten/issues/4956 and https://github.com/apache/gluten/issues/8598
const auto * trim_str_arg = addConstColumn(actions_dag, std::make_shared<DataTypeString>(), " \t\n\r\f");
args[0] = toFunctionNode(actions_dag, "trimBothSpark", {args[0], trim_str_arg});
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "CAST", args);
}
else
{
/// Common process: CAST(input, type)
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "CAST", args);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
auto source = std::make_shared<SubstraitFileSource>(getContext(), header, local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(), std::move(source_pipe), "substrait local files");
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType)
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex)
source_step->setStepDescription("ParquetReaderV3");
Comment on lines +204 to 205
Comment on lines +204 to 205
else
source_step->setStepDescription("ParquetReader");
Expand Down
3 changes: 3 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <Core/Names.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/IDataType.h>
#include <Functions/FunctionFactory.h>
Expand Down Expand Up @@ -148,6 +149,8 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con
const auto & origin_column = origin_columns[i];
const auto & origin_type = origin_column.type;
auto final_type = TypeParser::parseType(output_schema.types(i));
if (origin_type->isNullable() && !final_type->isNullable())
final_type = makeNullable(final_type);

/// Intermediate aggregate data is special, no check here.
if (typeid_cast<const DataTypeAggregateFunction *>(origin_column.type.get()) || origin_type->equals(*final_type))
Expand Down
80 changes: 79 additions & 1 deletion cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "SparkRowToCHColumn.h"
#include <algorithm>
#include <memory>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
Expand Down Expand Up @@ -47,6 +48,69 @@ jmethodID SparkRowToCHColumn::spark_row_interator_hasNext = nullptr;
jmethodID SparkRowToCHColumn::spark_row_interator_next = nullptr;
jmethodID SparkRowToCHColumn::spark_row_iterator_nextBatch = nullptr;

static Field normalizeFieldForType(Field field, const DataTypePtr & type)
{
const auto type_without_nullable = removeNullable(type);

if (field.isNull())
return type->isNullable() ? field : type_without_nullable->getDefault();

if (type->isNullable())
return normalizeFieldForType(std::move(field), type_without_nullable);

if (const auto * array_type = typeid_cast<const DataTypeArray *>(type_without_nullable.get()))
{
if (field.getType() != Field::Types::Array)
return field;

auto & array = field.safeGet<Array>();
const auto & nested_type = array_type->getNestedType();
for (auto & element : array)
element = normalizeFieldForType(std::move(element), nested_type);
return field;
}

if (const auto * map_type = typeid_cast<const DataTypeMap *>(type_without_nullable.get()))
{
if (field.getType() != Field::Types::Map)
return field;

auto & map = field.safeGet<Map>();
const auto & key_type = map_type->getKeyType();
const auto & value_type = map_type->getValueType();
for (auto & entry : map)
{
if (entry.isNull())
{
entry = Tuple{key_type->getDefault(), value_type->getDefault()};
continue;
}

auto & tuple = entry.safeGet<Tuple>();
if (tuple.size() == 2)
{
tuple[0] = normalizeFieldForType(std::move(tuple[0]), key_type);
tuple[1] = normalizeFieldForType(std::move(tuple[1]), value_type);
}
}
return field;
}

if (const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type_without_nullable.get()))
{
if (field.getType() != Field::Types::Tuple)
return field;

auto & tuple = field.safeGet<Tuple>();
const auto & element_types = tuple_type->getElements();
for (size_t i = 0; i < std::min(tuple.size(), element_types.size()); ++i)
tuple[i] = normalizeFieldForType(std::move(tuple[i]), element_types[i]);
return field;
}

return field;
}

ALWAYS_INLINE static void writeRowToColumns(const std::vector<MutableColumnPtr> & columns, const SparkRowReader & spark_row_reader)
{
auto num_fields = columns.size();
Expand All @@ -64,7 +128,21 @@ ALWAYS_INLINE static void writeRowToColumns(const std::vector<MutableColumnPtr>
columns[i]->insert(spark_row_reader.getField(i)); // read decimal128
}
else
columns[i]->insert(spark_row_reader.getField(i));
{
DB::Field field = spark_row_reader.getField(i);
const auto & type = spark_row_reader.getFieldTypes()[i];
if (field.isNull())
{
if (type->isNullable())
columns[i]->insert(field);
else
columns[i]->insertDefault();
}
else if (spark_row_reader.needNestedNullNormalization(i))
columns[i]->insert(normalizeFieldForType(std::move(field), type));
else
columns[i]->insert(field);
}
Comment on lines +131 to +145
}
}

Expand Down
37 changes: 37 additions & 0 deletions cpp-ch/local-engine/Parser/SparkRowToCHColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
#pragma once

#include <algorithm>
#include <memory>
#include <jni.h>
#include <Core/Block.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/TypeParser.h>
Expand Down Expand Up @@ -170,13 +174,33 @@ class FixedLengthDataReader
class SparkRowReader
{
public:
static bool needNestedNullNormalization(const DB::DataTypePtr & type)
{
const auto type_without_nullable = DB::removeNullable(type);

if (const auto * array_type = typeid_cast<const DB::DataTypeArray *>(type_without_nullable.get()))
return needNullNormalization(array_type->getNestedType());

if (const auto * map_type = typeid_cast<const DB::DataTypeMap *>(type_without_nullable.get()))
return needNullNormalization(map_type->getKeyType()) || needNullNormalization(map_type->getValueType());

if (const auto * tuple_type = typeid_cast<const DB::DataTypeTuple *>(type_without_nullable.get()))
{
const auto & element_types = tuple_type->getElements();
return std::any_of(element_types.begin(), element_types.end(), needNullNormalization);
}

return false;
}

explicit SparkRowReader(const DB::DataTypes & field_types_)
: field_types(field_types_)
, num_fields(field_types.size())
, bit_set_width_in_bytes(static_cast<int32_t>(calculateBitSetWidthInBytes(num_fields)))
, field_offsets(num_fields)
, support_raw_datas(num_fields)
, is_big_endians_in_spark_row(num_fields)
, need_nested_null_normalizations(num_fields)
, fixed_length_data_readers(num_fields)
, variable_length_data_readers(num_fields)
{
Expand All @@ -186,6 +210,7 @@ class SparkRowReader
field_offsets[ordinal] = bit_set_width_in_bytes + ordinal * 8L;
support_raw_datas[ordinal] = BackingDataLengthCalculator::isDataTypeSupportRawData(type_without_nullable);
is_big_endians_in_spark_row[ordinal] = BackingDataLengthCalculator::isBigEndianInSparkRow(type_without_nullable);
need_nested_null_normalizations[ordinal] = needNestedNullNormalization(field_types[ordinal]);
if (BackingDataLengthCalculator::isFixedLengthDataType(type_without_nullable))
fixed_length_data_readers[ordinal] = std::make_shared<FixedLengthDataReader>(field_types[ordinal]);
else if (BackingDataLengthCalculator::isVariableLengthDataType(type_without_nullable))
Expand All @@ -209,6 +234,12 @@ class SparkRowReader
return is_big_endians_in_spark_row[ordinal];
}

bool needNestedNullNormalization(size_t ordinal) const
{
assertIndexIsValid(ordinal);
return need_nested_null_normalizations[ordinal];
}

std::shared_ptr<FixedLengthDataReader> getFixedLengthDataReader(int ordinal) const
{
assertIndexIsValid(ordinal);
Expand Down Expand Up @@ -374,11 +405,17 @@ class SparkRowReader
std::vector<int64_t> field_offsets;
std::vector<bool> support_raw_datas;
std::vector<bool> is_big_endians_in_spark_row;
std::vector<bool> need_nested_null_normalizations;
std::vector<std::shared_ptr<FixedLengthDataReader>> fixed_length_data_readers;
std::vector<std::shared_ptr<VariableLengthDataReader>> variable_length_data_readers;

const char * buffer;
int32_t length;

static bool needNullNormalization(const DB::DataTypePtr & type)
{
return !type->isNullable() || needNestedNullNormalization(type);
}
};

}
22 changes: 15 additions & 7 deletions cpp-ch/local-engine/Parser/TypeParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ DB::DataTypePtr TypeParser::getCHTypeByName(const String & spark_type_name)
return DB::DataTypeFactory::instance().get(ch_type_name);
}

DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, std::list<String> * field_names)
DB::DataTypePtr TypeParser::parseType(
const substrait::Type & substrait_type, std::list<String> * field_names, bool keep_list_nullability)
{
DB::DataTypePtr ch_type = nullptr;

Expand Down Expand Up @@ -185,14 +186,14 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
for (int i = 0; i < types.size(); ++i)
{
struct_field_names.push_back(field_names->front());
struct_field_types[i] = parseType(types[i], field_names);
struct_field_types[i] = parseType(types[i], field_names, keep_list_nullability);
}
}
else
{
/// Construct CH tuple type without DFS rule.
for (int i = 0; i < types.size(); ++i)
struct_field_types[i] = parseType(types[i]);
struct_field_types[i] = parseType(types[i], nullptr, keep_list_nullability);

const auto & names = substrait_type.struct_().names();
for (const auto & name : names)
Expand All @@ -209,9 +210,16 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
}
else if (substrait_type.has_list())
{
auto ch_nested_type = parseType(substrait_type.list().type());
auto ch_nested_type = parseType(substrait_type.list().type(), nullptr, true);
ch_type = std::make_shared<DB::DataTypeArray>(ch_nested_type);
ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type);
/// ClickHouse doesn't support Nullable(Array(...)) well in many execution paths.
/// In our parquet reader path, null arrays are represented as empty arrays (no null map).
/// So for top-level Substrait LIST, we intentionally drop outer nullability and keep Array(...).
///
/// Note: element nullability is still preserved via ch_nested_type; if the element is also a LIST,
/// its own nullability must be preserved to represent Array(Nullable(Array(...))).
if (keep_list_nullability)
ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type);
}
else if (substrait_type.has_map())
{
Expand All @@ -223,8 +231,8 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
}
else
{
auto ch_key_type = parseType(substrait_type.map().key());
auto ch_val_type = parseType(substrait_type.map().value());
auto ch_key_type = parseType(substrait_type.map().key(), nullptr, keep_list_nullability);
auto ch_val_type = parseType(substrait_type.map().value(), nullptr, keep_list_nullability);
ch_type = std::make_shared<DB::DataTypeMap>(ch_key_type, ch_val_type);
ch_type = tryWrapNullable(substrait_type.map().nullability(), ch_type);
}
Expand Down
Loading
Loading