Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20260425-25.12.10.7
CH_COMMIT=54c5bf9a97b
CH_BRANCH=rebase_ch/20260425-25.12.10.7-complextype
CH_COMMIT=b1510a2
11 changes: 11 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ BlockUtil::convertColumnAsNecessary(const DB::ColumnWithTypeAndName & column, co
DB::JoinCommon::convertColumnToNullable(nullable_column);
return {nullable_column.column, sample_column.type, sample_column.name};
}
else if (!sample_column.type->isNullable() && column.type->isNullable() && sample_column.type->equals(*DB::removeNullable(column.type)))
{
const auto * nullable_column = DB::checkAndGetColumn<DB::ColumnNullable>(column.column.get());
if (!nullable_column || std::ranges::any_of(nullable_column->getNullMapData(), [](UInt8 is_null) { return is_null != 0; }))
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
"Cannot convert nullable column with NULL values to non-nullable type. original:{} expected:{}",
column.dumpStructure(),
sample_column.dumpStructure());
return {nullable_column->getNestedColumnPtr(), sample_column.type, sample_column.name};
}
else
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR,
Expand Down
15 changes: 11 additions & 4 deletions cpp-ch/local-engine/Parser/ExpressionParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,13 @@ ExpressionParser::addConstColumn(DB::ActionsDAG & actions_dag, const DB::DataTyp
{
String name = DB::fieldToString(field).substr(0, 10);
name = getUniqueName(name);
const auto * res_node = &actions_dag.addColumn(DB::ColumnWithTypeAndName(type->createColumnConst(1, field), type, name));
/// Substrait null literals carry a concrete type; CH `createColumnConst` inserts into the nested column (e.g. ColumnArray),
/// which cannot accept `Field::Null` unless the type is Nullable(...).
DB::DataTypePtr const_type = type;
if (field.isNull() && !type->isNullable())
const_type = makeNullable(type);
const auto * res_node = &actions_dag.addColumn(
DB::ColumnWithTypeAndName(const_type->createColumnConst(1, field), const_type, name));
if (reuseCSE())
{
// The new node, res_node will be remained in the ActionsDAG, but it will not affect the execution.
Expand Down Expand Up @@ -307,6 +313,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
const auto & input_type = args[0]->result_type;
DataTypePtr denull_input_type = removeNullable(input_type);
DataTypePtr output_type = TypeParser::parseType(substrait_type);
DataTypePtr cast_output_type = input_type->isNullable() && !output_type->isNullable() ? makeNullable(output_type) : output_type;
DataTypePtr denull_output_type = removeNullable(output_type);
const ActionsDAG::Node * result_node = nullptr;
if (substrait_type.has_binary())
Expand Down Expand Up @@ -351,7 +358,7 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
else if (isString(denull_input_type) && substrait_type.has_bool_())
{
/// cast(string to boolean)
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "accurateCastOrNull", args);
}
else if (isString(denull_input_type) && isInt(denull_output_type))
Expand All @@ -360,13 +367,13 @@ ExpressionParser::NodeRawConstPtr ExpressionParser::parseExpression(ActionsDAG &
/// Refer to https://github.com/apache/gluten/issues/4956 and https://github.com/apache/gluten/issues/8598
const auto * trim_str_arg = addConstColumn(actions_dag, std::make_shared<DataTypeString>(), " \t\n\r\f");
args[0] = toFunctionNode(actions_dag, "trimBothSpark", {args[0], trim_str_arg});
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "CAST", args);
}
else
{
/// Common process: CAST(input, type)
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), output_type->getName()));
args.emplace_back(addConstColumn(actions_dag, std::make_shared<DataTypeString>(), cast_output_type->getName()));
result_node = toFunctionNode(actions_dag, "CAST", args);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ QueryPlanStepPtr ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
auto source = std::make_shared<SubstraitFileSource>(getContext(), header, local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(getContext(), std::move(source_pipe), "substrait local files");
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType)
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex)
source_step->setStepDescription("ParquetReaderV3");
Comment on lines +204 to 205
Comment on lines +204 to 205
else
source_step->setStepDescription("ParquetReader");
Expand Down
3 changes: 3 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <Core/Names.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/IDataType.h>
#include <Functions/FunctionFactory.h>
Expand Down Expand Up @@ -148,6 +149,8 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con
const auto & origin_column = origin_columns[i];
const auto & origin_type = origin_column.type;
auto final_type = TypeParser::parseType(output_schema.types(i));
if (origin_type->isNullable() && !final_type->isNullable())
final_type = makeNullable(final_type);

/// Intermediate aggregate data is special, no check here.
if (typeid_cast<const DataTypeAggregateFunction *>(origin_column.type.get()) || origin_type->equals(*final_type))
Expand Down
69 changes: 68 additions & 1 deletion cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "SparkRowToCHColumn.h"
#include <algorithm>
#include <memory>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
Expand Down Expand Up @@ -47,6 +48,69 @@ jmethodID SparkRowToCHColumn::spark_row_interator_hasNext = nullptr;
jmethodID SparkRowToCHColumn::spark_row_interator_next = nullptr;
jmethodID SparkRowToCHColumn::spark_row_iterator_nextBatch = nullptr;

static Field normalizeFieldForType(Field field, const DataTypePtr & type)
{
const auto type_without_nullable = removeNullable(type);

if (field.isNull())
return type->isNullable() ? field : type_without_nullable->getDefault();

if (type->isNullable())
return normalizeFieldForType(std::move(field), type_without_nullable);

if (const auto * array_type = typeid_cast<const DataTypeArray *>(type_without_nullable.get()))
{
if (field.getType() != Field::Types::Array)
return field;

auto & array = field.safeGet<Array>();
const auto & nested_type = array_type->getNestedType();
for (auto & element : array)
element = normalizeFieldForType(std::move(element), nested_type);
return field;
}

if (const auto * map_type = typeid_cast<const DataTypeMap *>(type_without_nullable.get()))
{
if (field.getType() != Field::Types::Map)
return field;

auto & map = field.safeGet<Map>();
const auto & key_type = map_type->getKeyType();
const auto & value_type = map_type->getValueType();
for (auto & entry : map)
{
if (entry.isNull())
{
entry = Tuple{key_type->getDefault(), value_type->getDefault()};
continue;
}

auto & tuple = entry.safeGet<Tuple>();
if (tuple.size() == 2)
{
tuple[0] = normalizeFieldForType(std::move(tuple[0]), key_type);
tuple[1] = normalizeFieldForType(std::move(tuple[1]), value_type);
}
}
return field;
}

if (const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type_without_nullable.get()))
{
if (field.getType() != Field::Types::Tuple)
return field;

auto & tuple = field.safeGet<Tuple>();
const auto & element_types = tuple_type->getElements();
for (size_t i = 0; i < std::min(tuple.size(), element_types.size()); ++i)
tuple[i] = normalizeFieldForType(std::move(tuple[i]), element_types[i]);
return field;
}

return field;
}

ALWAYS_INLINE static void writeRowToColumns(const std::vector<MutableColumnPtr> & columns, const SparkRowReader & spark_row_reader)
{
auto num_fields = columns.size();
Expand All @@ -64,7 +128,10 @@ ALWAYS_INLINE static void writeRowToColumns(const std::vector<MutableColumnPtr>
columns[i]->insert(spark_row_reader.getField(i)); // read decimal128
}
else
columns[i]->insert(spark_row_reader.getField(i));
{
DB::Field field = spark_row_reader.getField(i);
columns[i]->insert(normalizeFieldForType(std::move(field), spark_row_reader.getFieldTypes()[i]));
}
Comment on lines +131 to +145
}
}

Expand Down
22 changes: 15 additions & 7 deletions cpp-ch/local-engine/Parser/TypeParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ DB::DataTypePtr TypeParser::getCHTypeByName(const String & spark_type_name)
return DB::DataTypeFactory::instance().get(ch_type_name);
}

DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, std::list<String> * field_names)
DB::DataTypePtr TypeParser::parseType(
const substrait::Type & substrait_type, std::list<String> * field_names, bool keep_list_nullability)
{
DB::DataTypePtr ch_type = nullptr;

Expand Down Expand Up @@ -185,14 +186,14 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
for (int i = 0; i < types.size(); ++i)
{
struct_field_names.push_back(field_names->front());
struct_field_types[i] = parseType(types[i], field_names);
struct_field_types[i] = parseType(types[i], field_names, keep_list_nullability);
}
}
else
{
/// Construct CH tuple type without DFS rule.
for (int i = 0; i < types.size(); ++i)
struct_field_types[i] = parseType(types[i]);
struct_field_types[i] = parseType(types[i], nullptr, keep_list_nullability);

const auto & names = substrait_type.struct_().names();
for (const auto & name : names)
Expand All @@ -209,9 +210,16 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
}
else if (substrait_type.has_list())
{
auto ch_nested_type = parseType(substrait_type.list().type());
auto ch_nested_type = parseType(substrait_type.list().type(), nullptr, true);
ch_type = std::make_shared<DB::DataTypeArray>(ch_nested_type);
ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type);
/// ClickHouse doesn't support Nullable(Array(...)) well in many execution paths.
/// In our parquet reader path, null arrays are represented as empty arrays (no null map).
/// So for top-level Substrait LIST, we intentionally drop outer nullability and keep Array(...).
///
/// Note: element nullability is still preserved via ch_nested_type; if the element is also a LIST,
/// its own nullability must be preserved to represent Array(Nullable(Array(...))).
if (keep_list_nullability)
ch_type = tryWrapNullable(substrait_type.list().nullability(), ch_type);
}
else if (substrait_type.has_map())
{
Expand All @@ -223,8 +231,8 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type & substrait_type, st
}
else
{
auto ch_key_type = parseType(substrait_type.map().key());
auto ch_val_type = parseType(substrait_type.map().value());
auto ch_key_type = parseType(substrait_type.map().key(), nullptr, keep_list_nullability);
auto ch_val_type = parseType(substrait_type.map().value(), nullptr, keep_list_nullability);
ch_type = std::make_shared<DB::DataTypeMap>(ch_key_type, ch_val_type);
ch_type = tryWrapNullable(substrait_type.map().nullability(), ch_type);
}
Expand Down
3 changes: 2 additions & 1 deletion cpp-ch/local-engine/Parser/TypeParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ namespace local_engine
static DB::DataTypePtr getCHTypeByName(const String& spark_type_name);

/// When parsing named structure, we need the field names.
static DB::DataTypePtr parseType(const substrait::Type& substrait_type, std::list<String>* field_names);
static DB::DataTypePtr
parseType(const substrait::Type& substrait_type, std::list<String>* field_names, bool keep_list_nullability = false);

inline static DB::DataTypePtr parseType(const substrait::Type& substrait_type)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ ParquetFormatFile::createInputFormat(const Block & header, const std::shared_ptr

// TODO: rebase-25.12, support complex types when there is a nullable type
// for example: parquet type is Array, requested type is Nullable(Array(Nullable(String)))
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex && onlyFlatType)
if (format_settings.parquet.use_native_reader_v3 && !readRowIndex)
Comment on lines 222 to +224
{
LOG_TRACE(
&Poco::Logger::get("ParquetFormatFile"),
Expand Down
36 changes: 35 additions & 1 deletion cpp-ch/local-engine/tests/gtest_local_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
#include <iostream>
#include <incbin.h>

#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Disks/DiskLocal.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/registerInterpreters.h>
Expand Down Expand Up @@ -59,6 +63,36 @@ TEST(TESTUtil, TestByteToLong)
ASSERT_EQ(expected, result);
}

TEST(BlockUtil, ConvertNullableColumnAsNecessary)
{
auto string_type = std::make_shared<DataTypeString>();
auto nullable_string_type = std::make_shared<DataTypeNullable>(string_type);

auto nested = string_type->createColumn();
nested->insert("A");
nested->insert("B");
auto null_map = ColumnUInt8::create(2, 0);
ColumnWithTypeAndName nullable_column(
ColumnNullable::create(std::move(nested), std::move(null_map)), nullable_string_type, "col_1");
ColumnWithTypeAndName sample_column(string_type, "broadcast_right_col_1");

auto converted = BlockUtil::convertColumnAsNecessary(nullable_column, sample_column);
ASSERT_TRUE(converted.type->equals(*string_type));
ASSERT_EQ("broadcast_right_col_1", converted.name);
ASSERT_FALSE(converted.column->isNullable());

auto nested_with_null = string_type->createColumn();
nested_with_null->insert("A");
nested_with_null->insertDefault();
auto null_map_with_null = ColumnUInt8::create();
null_map_with_null->insertValue(0);
null_map_with_null->insertValue(1);
ColumnWithTypeAndName nullable_column_with_null(
ColumnNullable::create(std::move(nested_with_null), std::move(null_map_with_null)), nullable_string_type, "col_1");

ASSERT_THROW(BlockUtil::convertColumnAsNecessary(nullable_column_with_null, sample_column), DB::Exception);
}

TEST(ReadBufferFromFile, seekBackwards)
{
static constexpr size_t N = 256;
Expand Down Expand Up @@ -110,4 +144,4 @@ int main(int argc, char ** argv)

::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}
35 changes: 35 additions & 0 deletions cpp-ch/local-engine/tests/gtest_spark_row.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,41 @@ TEST(SparkRow, ArrayMapTypes)
}


TEST(SparkRow, NestedNullsIntoNonNullableComplexTypes)
{
const auto source_array_type = std::make_shared<DataTypeArray>(
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>()));
const auto target_array_type = std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt32>());
const auto source_tuple_type = std::make_shared<DataTypeTuple>(DataTypes{
std::make_shared<DataTypeNullable>(target_array_type),
source_array_type});
const auto target_tuple_type = std::make_shared<DataTypeTuple>(DataTypes{target_array_type, target_array_type});

DataTypeAndFields source_type_and_fields = {
{source_array_type, Array{Null{}, Int32(7)}},
{source_tuple_type, []() -> Field
{
Tuple tuple(2);
tuple[0] = Null{};
tuple[1] = Array{Null{}, Int32(3)};
return std::move(tuple);
}()},
};

SparkRowInfoPtr spark_row_info;
BlockPtr source_block;
std::tie(spark_row_info, source_block) = mockSparkRowInfoAndBlock(source_type_and_fields);

ColumnsWithTypeAndName target_columns(2);
target_columns[0] = ColumnWithTypeAndName(target_array_type, "a");
target_columns[1] = ColumnWithTypeAndName(target_tuple_type, "b");
Block target_header(target_columns);

auto out = SparkRowToCHColumn::convertSparkRowInfoToCHColumn(*spark_row_info, target_header);
EXPECT_EQ((*out->getByPosition(0).column)[0], Field(Array{Int32(0), Int32(7)}));
EXPECT_EQ((*out->getByPosition(1).column)[0], Field(Tuple{Array{}, Array{Int32(0), Int32(3)}}));
}

TEST(SparkRow, NullableComplexTypes)
{
const auto map_type = std::make_shared<DataTypeMap>(std::make_shared<DataTypeInt32>(), std::make_shared<DataTypeInt32>());
Expand Down
Loading