|
18 | 18 | #include "parquet_utils.h" |
19 | 19 | #include "enum_utils.h" |
20 | 20 | #include "compression_utils.h" |
| 21 | +#include "typed_buffer_values.h" |
21 | 22 | #include <cstring> |
22 | 23 | #include <iostream> |
23 | 24 |
|
24 | 25 | using namespace dbps::external; |
25 | 26 | using namespace dbps::enum_utils; |
26 | 27 | using namespace dbps::compression; |
| 28 | +using namespace dbps::processing; |
27 | 29 |
|
28 | 30 | // ----------------------------------------------------------------------------- |
29 | 31 | // Process Parquet formatted Dictionary and Data pages |
@@ -377,3 +379,63 @@ std::vector<uint8_t> GetTypedListAsValueBytes( |
377 | 379 | } |
378 | 380 |
|
379 | 381 | // ----------------------------------------------------------------------------- |
| 382 | +// Build Parquet formatted value bytes into TypedValuesBuffer |
| 383 | +// ----------------------------------------------------------------------------- |
| 384 | + |
| 385 | +TypedValuesBuffer ReinterpretValueBytesAsTypedValuesBuffer(tcb::span<const uint8_t> value_bytes, |
| 386 | + Type::type datatype, |
| 387 | + const std::optional<int>& datatype_length, |
| 388 | + Encoding::type encoding) { |
| 389 | + |
| 390 | + if (encoding == Encoding::RLE_DICTIONARY) { |
| 391 | + throw DBPSUnsupportedException( |
| 392 | + "Unsupported encoding: RLE_DICTIONARY is not supported for per-value operations " |
| 393 | + "since values are not present in the data, only references to them."); |
| 394 | + } |
| 395 | + |
| 396 | + if (encoding != Encoding::PLAIN) { |
| 397 | + throw DBPSUnsupportedException( |
| 398 | + "On ReinterpretValueBytesAsTypedValuesBuffer, unsupported encoding: " |
| 399 | + + std::string(to_string(encoding))); |
| 400 | + } |
| 401 | + |
| 402 | + if (datatype == Type::BOOLEAN) { |
| 403 | + throw DBPSUnsupportedException("On ReinterpretValueBytesAsTypedValuesBuffer, BOOLEAN datatype " |
| 404 | + "values are bit-encoded and not expanded as bytes, so BOOLEAN is not supported."); |
| 405 | + } |
| 406 | + |
| 407 | + switch (datatype) { |
| 408 | + case Type::INT32: |
| 409 | + return TypedBufferI32{value_bytes}; |
| 410 | + case Type::INT64: |
| 411 | + return TypedBufferI64{value_bytes}; |
| 412 | + case Type::FLOAT: |
| 413 | + return TypedBufferFloat{value_bytes}; |
| 414 | + case Type::DOUBLE: |
| 415 | + return TypedBufferDouble{value_bytes}; |
| 416 | + case Type::INT96: |
| 417 | + return TypedBufferInt96{value_bytes}; |
| 418 | + case Type::FIXED_LEN_BYTE_ARRAY: { |
| 419 | + if (!datatype_length.has_value() || datatype_length.value() <= 0) { |
| 420 | + throw InvalidInputException("FIXED_LEN_BYTE_ARRAY requires a positive datatype_length"); |
| 421 | + } |
| 422 | + return TypedBufferRawBytesFixedSized{ |
| 423 | + value_bytes, 0, RawBytesFixedSizedCodec{static_cast<size_t>(datatype_length.value())}}; |
| 424 | + } |
| 425 | + case Type::BYTE_ARRAY: |
| 426 | + return TypedBufferRawBytesVariableSized{value_bytes}; |
| 427 | + default: |
| 428 | + throw InvalidInputException( |
| 429 | + "Invalid datatype: " + std::string(to_string(datatype))); |
| 430 | + } |
| 431 | +} |
| 432 | + |
| 433 | +std::vector<uint8_t> GetTypedValuesBufferAsValueBytes(TypedValuesBuffer buffer) { |
| 434 | + // std::visit is needed to unwrap the variant. TypedValuesBuffer could be of different ByteBuffer types, |
| 435 | + // so the indirection is needed to call FinalizeAndTakeBuffer() on the correct type. In practice, this is the same for all. |
| 436 | + return std::visit([](auto& buf) -> std::vector<uint8_t> { |
| 437 | + return buf.FinalizeAndTakeBuffer(); |
| 438 | + }, buffer); |
| 439 | +} |
| 440 | + |
| 441 | +// ----------------------------------------------------------------------------- |
0 commit comments