Skip to content

Commit efac4b1

Browse files
committed
- Added multiple performance optimizations to ByteBuffer class.
- Added sequential write checking for variable-size elements. - Added heuristics for reserve count in variable-size parsing for read-only buffers. - Added unittests.
1 parent 5d5357e commit efac4b1

3 files changed

Lines changed: 194 additions & 34 deletions

File tree

src/processing/byte_buffer.cpp

Lines changed: 89 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "bytes_utils.h"
2121
#include "exceptions.h"
2222
#include <algorithm>
23+
#include <cmath>
2324
#include <cstring>
2425
#include <limits>
2526

@@ -81,15 +82,15 @@ void ByteBuffer::InitializeFromSpan() {
8182
// Variable-size layout stores [u32 size][element value] back-to-back.
8283
// Single pass validates shape and captures per-element prefix offsets.
8384
offsets_.clear();
84-
// TODO: consider a heuristic reserve estimate to reduce offsets_ reallocations.
85+
offsets_.reserve(EstimateOffsetsReserveCountFromSample(elements_span_));
8586
size_t cursor = 0;
8687
while (cursor < elements_span_.size()) {
87-
if (elements_span_.size() - cursor < 4) {
88+
if (elements_span_.size() - cursor < kSizePrefixBytes) {
8889
throw InvalidInputException("Malformed variable-size buffer: truncated length prefix");
8990
}
9091
offsets_.push_back(cursor);
9192
const size_t current_element_size = ReadSizeAt(elements_span_, cursor);
92-
cursor += 4;
93+
cursor += kSizePrefixBytes;
9394
if (elements_span_.size() - cursor < current_element_size) {
9495
throw InvalidInputException("Malformed variable-size buffer: truncated element payload");
9596
}
@@ -100,6 +101,46 @@ void ByteBuffer::InitializeFromSpan() {
100101
num_elements_ = offsets_.size();
101102
}
102103

104+
size_t ByteBuffer::EstimateOffsetsReserveCountFromSample(tcb::span<const uint8_t> bytes) {
105+
if (bytes.empty())
106+
return 0;
107+
108+
// Sample the first 10 elements to estimate the total element count.
109+
size_t cursor = 0;
110+
size_t sampled_elements = 0;
111+
while (cursor < bytes.size() && sampled_elements < 10) {
112+
if (bytes.size() - cursor < kSizePrefixBytes) {
113+
throw InvalidInputException("Malformed variable-size buffer: truncated length prefix");
114+
}
115+
const size_t current_element_size = ReadSizeAt(bytes, cursor);
116+
cursor += kSizePrefixBytes;
117+
if (bytes.size() - cursor < current_element_size) {
118+
throw InvalidInputException("Malformed variable-size buffer: truncated element payload");
119+
}
120+
cursor += current_element_size;
121+
++sampled_elements;
122+
}
123+
124+
if (sampled_elements == 0 || cursor == 0)
125+
return 0;
126+
127+
// If sampling consumed the full buffer (<= sample window), we already know the exact count.
128+
if (cursor == bytes.size())
129+
return sampled_elements;
130+
131+
// Estimate total element count from sample density:
132+
// (sampled_elements / sampled_bytes) * total_bytes.
133+
// - sampled_elements / sampled_bytes gives "elements per byte" in the sampled prefix,
134+
// - then multiplying by total_bytes extrapolates a full-buffer estimate.
135+
const long double estimated =
136+
(static_cast<long double>(bytes.size()) * static_cast<long double>(sampled_elements)) /
137+
static_cast<long double>(cursor);
138+
const size_t estimated_count = static_cast<size_t>(std::ceil(estimated));
139+
const size_t estimated_with_headroom =
140+
static_cast<size_t>(std::ceil(static_cast<long double>(estimated_count) * 1.1L));
141+
return std::max(estimated_with_headroom, sampled_elements);
142+
}
143+
103144
// -----------------------------------------------------------------------------
104145
// Span reader methods
105146
// -----------------------------------------------------------------------------
@@ -130,7 +171,7 @@ tcb::span<const uint8_t> ByteBuffer::GetElement(size_t position) const {
130171
throw InvalidInputException("Element position has not been written yet");
131172
}
132173
const size_t element_size = ReadSizeAt(elements_span_, offset);
133-
return elements_span_.subspan(offset + 4, element_size);
174+
return elements_span_.subspan(offset + kSizePrefixBytes, element_size);
134175
}
135176

136177
// -----------------------------------------------------------------------------
@@ -186,7 +227,7 @@ void ByteBuffer::InitializeForWriteBuffer(size_t variable_size_reserved_bytes_hi
186227
// Reserve write_buffer to at least the size of the prefix [u32 size] bytes for all elements,
187228
// and use a larger reserved-bytes hint if given as a best guess to reduce reallocations.
188229
// write_buffer is not initialized to anything since we will be appending to it during SetElement, just reserving capacity.
189-
const size_t min_required_prefix_bytes = num_elements_ * static_cast<size_t>(sizeof(uint32_t));
230+
const size_t min_required_prefix_bytes = num_elements_ * kSizePrefixBytes;
190231
const size_t variable_size_reserved_bytes = std::max(variable_size_reserved_bytes_hint, min_required_prefix_bytes);
191232
write_buffer_.clear();
192233
write_buffer_.reserve(variable_size_reserved_bytes);
@@ -195,6 +236,9 @@ void ByteBuffer::InitializeForWriteBuffer(size_t variable_size_reserved_bytes_hi
195236
offsets_.clear();
196237
offsets_.resize(num_elements_, kUnsetVariableElementOffset);
197238

239+
// next_expected_sequential_position_ is initialized to 0 for sequential write checking.
240+
next_expected_sequential_position_ = 0;
241+
198242
// elements_span_ is re-bound to the write buffer.
199243
RebindSpanToWriteBuffer();
200244
}
@@ -239,7 +283,16 @@ void ByteBuffer::SetElement(size_t position, tcb::span<const uint8_t> element) {
239283
const size_t offset = write_buffer_.size();
240284
offsets_[position] = offset;
241285
append_u32_le(write_buffer_, static_cast<uint32_t>(element.size()));
242-
write_buffer_.insert(write_buffer_.end(), element.begin(), element.end());
286+
write_buffer_.insert(write_buffer_.end(), element.begin(), element.end()); // Appends at the end of the buffer.
287+
288+
// Update next_expected_sequential_position_ for sequential write checking.
289+
if (next_expected_sequential_position_ != kUnsetVariableElementOffset) {
290+
if (position == next_expected_sequential_position_) {
291+
next_expected_sequential_position_ += 1;
292+
} else {
293+
next_expected_sequential_position_ = kUnsetVariableElementOffset;
294+
}
295+
}
243296

244297
RebindSpanToWriteBuffer();
245298
}
@@ -259,49 +312,51 @@ std::vector<uint8_t> ByteBuffer::FinalizeAndTakeBuffer() {
259312
return std::move(write_buffer_);
260313
}
261314

262-
// Variable-size: validate all offsets and records first, and detect whether
263-
// the buffer is already sequential/unfragmented.
264-
bool is_sequential = true;
265-
size_t current_offset = 0;
315+
// For variable-size when all elements were written exactly once and in sequential order,
316+
// we can skip out-of-order or fragmentation checks. This is the fast path.
317+
// This is the most common behavior when writing elements in single threaded mode.
318+
if (next_expected_sequential_position_ == num_elements_) {
319+
if (num_elements_ > 0) {
320+
const size_t last_element_offset = offsets_[num_elements_ - 1];
321+
const size_t last_element_size = ReadSizeAt(elements_span_, last_element_offset);
322+
const size_t logical_size = last_element_offset + kSizePrefixBytes + last_element_size;
323+
if (logical_size != write_buffer_.size()) {
324+
throw InvalidInputException("FinalizeAndTakeBuffer: trailing bytes detected beyond last element");
325+
}
326+
}
327+
write_buffer_finalized_ = true;
328+
return std::move(write_buffer_);
329+
}
330+
331+
// For variable-size, when elements are written out of order, assume the buffer is fragmented and potentially with orphaned bytes
332+
// The buffer is validated and rebuilt into a compact buffer in one pass.
333+
std::vector<uint8_t> result;
334+
result.reserve(write_buffer_.size());
266335
for (size_t i = 0; i < num_elements_; ++i) {
267336
const size_t element_offset = offsets_[i];
268337
if (element_offset == kUnsetVariableElementOffset) {
269338
throw InvalidInputException("Cannot finalize variable-size buffer: not all elements were written");
270339
}
271-
if (element_offset > write_buffer_.size() || (write_buffer_.size() - element_offset) < 4) {
340+
if (element_offset > write_buffer_.size() || (write_buffer_.size() - element_offset) < kSizePrefixBytes) {
272341
throw InvalidInputException("Cannot finalize variable-size buffer: invalid element offset");
273342
}
274343

275344
const size_t element_size = ReadSizeAt(elements_span_, element_offset);
276-
if (element_size > (write_buffer_.size() - element_offset - 4)) {
345+
if (element_size > (write_buffer_.size() - element_offset - kSizePrefixBytes)) {
277346
throw InvalidInputException("Cannot finalize variable-size buffer: malformed element payload");
278347
}
279348

280-
if (element_offset != current_offset) {
281-
is_sequential = false;
282-
}
283-
current_offset += 4 + element_size;
284-
}
285-
// Detect trailing/orphan bytes in write_buffer_ and force rewrite if present.
286-
if (current_offset != write_buffer_.size()) {
287-
is_sequential = false;
349+
const size_t record_size = kSizePrefixBytes + element_size;
350+
result.insert(
351+
result.end(),
352+
write_buffer_.data() + element_offset,
353+
write_buffer_.data() + element_offset + record_size);
288354
}
289355

290-
// If sequential, transfer ownership of write_buffer_ directly.
291-
if (is_sequential) {
292-
write_buffer_finalized_ = true;
293-
return std::move(write_buffer_);
294-
}
356+
// Defrag path returns a new buffer; release the original fragmented write buffer.
357+
write_buffer_.clear();
358+
write_buffer_.shrink_to_fit();
295359

296-
// Fragmented: rebuild a compact buffer in element order.
297-
std::vector<uint8_t> result;
298-
result.reserve(current_offset);
299-
for (size_t i = 0; i < num_elements_; ++i) {
300-
const size_t element_offset = offsets_[i];
301-
const size_t elem_size = ReadSizeAt(elements_span_, element_offset);
302-
const uint8_t* start = write_buffer_.data() + element_offset;
303-
result.insert(result.end(), start, start + 4 + elem_size);
304-
}
305360
write_buffer_finalized_ = true;
306361
return result;
307362
}

src/processing/byte_buffer.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ class ByteBuffer {
5858
std::vector<uint8_t> FinalizeAndTakeBuffer();
5959

6060
protected:
61+
// Helper for reserve heuristics in variable-size parsing.
62+
static size_t EstimateOffsetsReserveCountFromSample(tcb::span<const uint8_t> bytes);
63+
64+
// Helper for calculating the offset of an element by position.
6165
size_t CalculateOffsetOfElement(size_t position) const;
6266

6367
// Variables for span elements reading
@@ -73,6 +77,11 @@ class ByteBuffer {
7377
std::vector<uint8_t> write_buffer_;
7478
bool write_buffer_finalized_ = false;
7579

80+
// Variable for sequential variable-size writes.
81+
// Tracks next expected position for sequential variable-size writes.
82+
// Value is invalidated to kUnsetVariableElementOffset once order is violated.
83+
size_t next_expected_sequential_position_ = 0;
84+
7685
private:
7786
// Initialization methods for read-only buffer
7887
void InitializeFromSpan();
@@ -83,5 +92,6 @@ class ByteBuffer {
8392
};
8493

8594
inline constexpr size_t kUnsetVariableElementOffset = std::numeric_limits<size_t>::max();
95+
inline constexpr size_t kSizePrefixBytes = sizeof(uint32_t); // [u32 size] prefix for variable-size elements.
8696

8797
} // namespace dbps::processing

src/processing/byte_buffer_test.cpp

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <vector>
2222
#include <cstdint>
2323

24+
#include "bytes_utils.h"
2425
#include "exceptions.h"
2526

2627
using dbps::processing::ByteBuffer;
@@ -44,12 +45,16 @@ class ByteBufferTestProxy : public ByteBuffer {
4445
: ByteBuffer(num_elements, reserved_bytes_hint, use_reserve_hint) {}
4546
ByteBufferTestProxy(size_t num_elements, size_t element_size)
4647
: ByteBuffer(num_elements, element_size) {}
48+
using ByteBuffer::EstimateOffsetsReserveCountFromSample;
4749

4850
size_t GetNumElements() const { return num_elements_; }
4951
bool GetHasFixedSizedElements() const { return has_fixed_sized_elements_; }
5052
size_t GetElementSize() const { return element_size_; }
5153
const std::vector<size_t>& GetOffsets() const { return offsets_; }
5254
const std::vector<uint8_t>& GetWriteBuffer() const { return write_buffer_; }
55+
void AppendTrailingBytesForTest(tcb::span<const uint8_t> bytes) {
56+
write_buffer_.insert(write_buffer_.end(), bytes.begin(), bytes.end());
57+
}
5358
};
5459

5560
namespace {
@@ -123,6 +128,60 @@ TEST(ByteBufferTest, ConstructVariableSize_ValidEncodedBuffer_InitializesExpecte
123128
EXPECT_EQ(buffer.GetOffsets()[1], 9u); // 4 bytes length prefix + 5 bytes first payload.
124129
}
125130

131+
TEST(ByteBufferTest, EstimateOffsetsReserveCountFromSample_MultipleCases) {
132+
const auto make_variable_size_bytes = [](const std::vector<size_t>& sizes) {
133+
std::vector<uint8_t> bytes;
134+
for (size_t idx = 0; idx < sizes.size(); ++idx) {
135+
append_u32_le(bytes, static_cast<uint32_t>(sizes[idx]));
136+
for (size_t j = 0; j < sizes[idx]; ++j) {
137+
bytes.push_back(static_cast<uint8_t>((idx + j) & 0xFF));
138+
}
139+
}
140+
return bytes;
141+
};
142+
143+
// Empty buffer.
144+
{
145+
const std::vector<size_t> sizes = {};
146+
const std::vector<uint8_t> bytes = make_variable_size_bytes(sizes);
147+
const size_t estimated = ByteBufferTestProxy::EstimateOffsetsReserveCountFromSample(tcb::span<const uint8_t>(bytes));
148+
EXPECT_EQ(estimated, 0u);
149+
}
150+
151+
// Buffer with less than sample size (5 vs 10): exact count, no extrapolation/headroom needed.
152+
{
153+
const std::vector<size_t> sizes = {1u, 2u, 50u, 4u, 7u};
154+
const std::vector<uint8_t> bytes = make_variable_size_bytes(sizes);
155+
const size_t estimated = ByteBufferTestProxy::EstimateOffsetsReserveCountFromSample(tcb::span<const uint8_t>(bytes));
156+
EXPECT_EQ(estimated, sizes.size());
157+
}
158+
159+
// Buffer with uniform record sizes. Base estimate is exact, then +10% headroom is applied.
160+
{
161+
// 30 elements, each 6 bytes long.
162+
const size_t num_elements = 30u;
163+
const size_t payload_size = 6u;
164+
std::vector<uint8_t> bytes;
165+
for (size_t i = 0; i < num_elements; ++i) {
166+
append_u32_le(bytes, static_cast<uint32_t>(payload_size));
167+
for (size_t j = 0; j < payload_size; ++j) {
168+
bytes.push_back(static_cast<uint8_t>((i + j) & 0xFF));
169+
}
170+
}
171+
const size_t estimate = ByteBufferTestProxy::EstimateOffsetsReserveCountFromSample(tcb::span<const uint8_t>(bytes));
172+
EXPECT_EQ(estimate, 33u); //33 = 30 elements + 10% headroom.
173+
}
174+
175+
// Buffer to estimate intentionally overshoots the true element count.
176+
{
177+
// First 10 elements are very small, tail elements are very large:
178+
const std::vector<size_t> sizes = {1u, 1u, 1u, 1u, 1u, 1u, 1u, 1u, 1u, 1u, 100u, 100u};
179+
const std::vector<uint8_t> bytes = make_variable_size_bytes(sizes);
180+
const size_t estimate = ByteBufferTestProxy::EstimateOffsetsReserveCountFromSample(tcb::span<const uint8_t>(bytes));
181+
EXPECT_GT(estimate, sizes.size());
182+
}
183+
}
184+
126185
TEST(ByteBufferTest, GetElement_VariableSize_ReturnsExpectedPayload) {
127186
// [len=5]["ABCDE"][len=7]["1234567"]
128187
std::vector<uint8_t> bytes = {
@@ -293,6 +352,7 @@ TEST(ByteBufferTest, VariableSizeWrite_ExactHint_NoReallocationAndExactUsedSize)
293352

294353
EXPECT_EQ(buffer.GetWriteBuffer().size(), exact_hint_bytes);
295354
EXPECT_EQ(buffer.GetWriteBuffer().capacity(), initial_capacity);
355+
EXPECT_EQ(buffer.GetWriteBuffer().capacity(), exact_hint_bytes);
296356
EXPECT_EQ(buffer.GetWriteBuffer().data(), initial_data_ptr);
297357

298358
for (size_t i = 0; i < num_elements; ++i) {
@@ -302,6 +362,10 @@ TEST(ByteBufferTest, VariableSizeWrite_ExactHint_NoReallocationAndExactUsedSize)
302362
EXPECT_EQ(value[j], payloads[i][j]);
303363
}
304364
}
365+
366+
const uint8_t* const data_ptr_before_finalize = buffer.GetWriteBuffer().data();
367+
std::vector<uint8_t> final_buffer = buffer.FinalizeAndTakeBuffer();
368+
EXPECT_EQ(final_buffer.data(), data_ptr_before_finalize); // Same allocation: finalize returned write_buffer_ as-is.
305369
}
306370

307371
TEST(ByteBufferTest, VariableSizeWrite_ExceedsHint_ReallocatesBuffer) {
@@ -373,6 +437,26 @@ TEST(ByteBufferTest, FinalizeAndTakeBuffer_VariableSize_Sequential_ReturnsAsIs)
373437
EXPECT_EQ(final_buffer.data(), data_ptr_before); // Same allocation (moved, not copied).
374438
}
375439

440+
TEST(ByteBufferTest, FinalizeAndTakeBuffer_VariableSize_SequentialWithTrailingBytes_Throws) {
441+
ByteBufferTestProxy buffer(3u, 64u, true);
442+
std::vector<uint8_t> first = {0x10, 0x11};
443+
std::vector<uint8_t> second = {0x20, 0x21, 0x22};
444+
std::vector<uint8_t> third = {0x30};
445+
446+
buffer.SetElement(0, tcb::span<const uint8_t>(first));
447+
buffer.SetElement(1, tcb::span<const uint8_t>(second));
448+
buffer.SetElement(2, tcb::span<const uint8_t>(third));
449+
450+
const size_t expected_trimmed_size =
451+
(4u + first.size()) + (4u + second.size()) + (4u + third.size());
452+
453+
std::vector<uint8_t> trailing = {0xEE, 0xEF, 0xF0};
454+
buffer.AppendTrailingBytesForTest(tcb::span<const uint8_t>(trailing));
455+
EXPECT_EQ(buffer.GetWriteBuffer().size(), expected_trimmed_size + trailing.size());
456+
457+
EXPECT_THROW((void)buffer.FinalizeAndTakeBuffer(), InvalidInputException);
458+
}
459+
376460
TEST(ByteBufferTest, FinalizeAndTakeBuffer_VariableSize_OutOfOrder_Defragments) {
377461
ByteBufferTestProxy buffer(3u, 64u, true);
378462
std::vector<uint8_t> first = {0x10, 0x11};
@@ -484,6 +568,17 @@ TEST(ByteBufferTest, SetElement_FixedSize_OutOfOrderAndOverwrite_ReturnsLatestVa
484568
EXPECT_EQ(e1[1], v1_first[1]);
485569
EXPECT_EQ(e2[0], v2_second[0]);
486570
EXPECT_EQ(e2[1], v2_second[1]);
571+
572+
const uint8_t* const data_ptr_before_finalize = buffer.GetWriteBuffer().data();
573+
std::vector<uint8_t> final_buffer = buffer.FinalizeAndTakeBuffer();
574+
EXPECT_EQ(final_buffer.data(), data_ptr_before_finalize); // Fixed-size finalize should move write_buffer_ directly.
575+
ASSERT_EQ(final_buffer.size(), 6u);
576+
EXPECT_EQ(final_buffer[0], v0_second[0]);
577+
EXPECT_EQ(final_buffer[1], v0_second[1]);
578+
EXPECT_EQ(final_buffer[2], v1_first[0]);
579+
EXPECT_EQ(final_buffer[3], v1_first[1]);
580+
EXPECT_EQ(final_buffer[4], v2_second[0]);
581+
EXPECT_EQ(final_buffer[5], v2_second[1]);
487582
}
488583

489584
TEST(ByteBufferTest, SetElement_FixedSize_WrongPayloadSize_Throws) {

0 commit comments

Comments
 (0)