Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions docs/index_build_recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Index Build Recovery and Online Build Notes

## Current recovery invariant

A committed valid index must have recoverable physical index storage. For ART indexes created after
the last checkpoint, `CREATE INDEX` writes a physical `CREATE_INDEX_RECORD` to the WAL. The record
contains the index catalog entry, `IndexInfo`, and serialized ART tree bytes. WAL replay restores the
catalog entry if needed, installs the ART bytes into data-file pages, and attaches the index to the
node table.

This matches the small/normal-index shape used by DuckDB and PostgreSQL: recovery replays physical
index storage instead of rescanning the base table.

## Large index optimization

For very large ART indexes, writing the full serialized tree into the WAL can duplicate many
gigabytes of data. `CREATE ART INDEX` switches to a blocking checkpoint-instead-of-WAL path when the
serialized ART tree is larger than the configured threshold. The statement returns a message saying
this path is active, commits a small catalog WAL record, and forces a blocking checkpoint before
returning to the user. If the process crashes before that checkpoint completes, WAL replay sees the
catalog index entry and rebuilds the physical ART index by scanning the base table.

The default threshold is 256 MiB. Tests can override it with
`LBUG_CREATE_INDEX_WAL_THRESHOLD=<bytes>`.

The key safety rule is that there must be no window where a valid catalog index entry is committed
without durable physical index pages or a WAL record/recovery path that can recreate them.

## Future abandoned-build GC

A cheaper crash-recovery path for large index builds would write index pages before commit and keep
explicit build state:

1. Build the index into private storage while the catalog entry is not visible to queries.
2. At commit, hold the write gate and persist the index pages through the checkpoint/shadow protocol.
3. Publish the catalog entry only after the physical pages are durable.
4. Commit with a small WAL record that points at the durable index storage, or with no index WAL
payload if the checkpoint record fully covers the transaction.

If a crash happens during the build, recovery should ignore incomplete build metadata and let normal
space reclamation garbage collect the abandoned index pages later.

## Future online build

Online index creation should use an explicit index state machine:

- `BUILDING`: catalog entry exists for coordination, but the optimizer ignores it.
- `CATCHING_UP`: the base snapshot scan has finished and pending writes are being merged.
- `VALID`: the optimizer can use the index.
- `INVALID`: build failed or recovery found an incomplete build; queries ignore it until dropped or
rebuilt.

The build should scan a snapshot of the table in parallel and maintain a side delta for concurrent
writes. At validation time, merge the delta, wait for conflicting snapshots when needed, and atomically
mark the index `VALID`.

## Parallelism and read latency

Parallel index creation should use a global memory budget for the whole build, not one full budget per
worker. The planner/build scheduler should choose worker count from table size, available cores, and
the configured memory budget.

To reduce read latency impact:

- Prefer sequential table scans and sequential index page writes.
- Avoid loading newly built index pages through the normal buffer-cache hot path.
- Throttle build I/O when foreground read latency rises.
- Keep generated index pages in private build buffers until they are published.
- Make the optimizer ignore `BUILDING`/`INVALID` indexes.

PostgreSQL's standard `CREATE INDEX` allows reads while blocking writes. Its `CREATE INDEX
CONCURRENTLY` keeps writes running at the cost of extra scans and validation waits. Ladybug can follow
that split: a cheaper blocking-write build first, then a fully online build path.
14 changes: 10 additions & 4 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "parser/ddl/drop.h"
#include "parser/expression/parsed_function_expression.h"
#include "parser/expression/parsed_literal_expression.h"
#include "storage/index/art_index.h"
#include "storage/index/hash_index.h"
#include "storage/storage_manager.h"
#include "transaction/transaction.h"
Expand Down Expand Up @@ -422,8 +423,11 @@ std::unique_ptr<BoundStatement> Binder::bindCreateIndex(const Statement& stateme
if (!nodeTableEntry->getStorage().empty()) {
throw BinderException("CREATE INDEX is only supported on native node tables.");
}
if (!StringUtils::caseInsensitiveEquals(nodeTableEntry->getPrimaryKeyName(),
info.propertyName)) {
const auto isPrimaryIndex =
StringUtils::caseInsensitiveEquals(nodeTableEntry->getPrimaryKeyName(), info.propertyName);
const auto isArtIndex = StringUtils::caseInsensitiveEquals(indexType,
storage::ArtPrimaryKeyIndex::getIndexType().typeName);
if (!isPrimaryIndex && !isArtIndex) {
throw BinderException(std::format(
"{} indexes are currently supported only on node primary keys.", indexType));
}
Expand All @@ -434,7 +438,9 @@ std::unique_ptr<BoundStatement> Binder::bindCreateIndex(const Statement& stateme
auto& property = tableEntry->getProperty(info.propertyName);
std::vector<PropertyDefinition> propertyDefinitions;
propertyDefinitions.push_back(property.copy());
validatePrimaryKey(property.getName(), propertyDefinitions);
if (isPrimaryIndex) {
validatePrimaryKey(property.getName(), propertyDefinitions);
}
auto indexName = info.indexName.empty() ? std::string(storage::PrimaryKeyIndex::DEFAULT_NAME) :
info.indexName;
if (info.onConflict == ConflictAction::ON_CONFLICT_THROW) {
Expand All @@ -454,7 +460,7 @@ std::unique_ptr<BoundStatement> Binder::bindCreateIndex(const Statement& stateme
BoundCreateIndexInfo boundInfo{indexType, std::move(indexName), info.tableName,
tableEntry->getTableID(), property.getName(), tableEntry->getPropertyID(property.getName()),
tableEntry->getColumnID(property.getName()), property.getType().getPhysicalType(),
info.onConflict};
isPrimaryIndex, info.onConflict};
return std::make_unique<BoundCreateIndex>(std::move(boundInfo));
}

Expand Down
6 changes: 3 additions & 3 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,10 @@ bool Catalog::containsType(const Transaction* transaction, const std::string& ty
return types->containsEntry(transaction, typeName);
}

void Catalog::createIndex(Transaction* transaction,
std::unique_ptr<CatalogEntry> indexCatalogEntry) {
void Catalog::createIndex(Transaction* transaction, std::unique_ptr<CatalogEntry> indexCatalogEntry,
bool skipLoggingToWAL) {
DASSERT(indexCatalogEntry->getType() == CatalogEntryType::INDEX_ENTRY);
indexes->createEntry(transaction, std::move(indexCatalogEntry));
indexes->createEntry(transaction, std::move(indexCatalogEntry), skipLoggingToWAL);
}

IndexCatalogEntry* Catalog::getIndex(const Transaction* transaction, table_id_t tableID,
Expand Down
5 changes: 3 additions & 2 deletions src/catalog/catalog_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ CatalogEntry* CatalogSet::getEntryNoLock(const Transaction* transaction,
return entry;
}

oid_t CatalogSet::createEntry(Transaction* transaction, std::unique_ptr<CatalogEntry> entry) {
oid_t CatalogSet::createEntry(Transaction* transaction, std::unique_ptr<CatalogEntry> entry,
bool skipLoggingToWAL) {
CatalogEntry* entryPtr = nullptr;
oid_t oid = INVALID_OID;
{
Expand All @@ -73,7 +74,7 @@ oid_t CatalogSet::createEntry(Transaction* transaction, std::unique_ptr<CatalogE
}
DASSERT(entryPtr);
if (transaction->shouldAppendToUndoBuffer()) {
transaction->pushCreateDropCatalogEntry(*this, *entryPtr, isInternal());
transaction->pushCreateDropCatalogEntry(*this, *entryPtr, isInternal(), skipLoggingToWAL);
}
return oid;
}
Expand Down
76 changes: 50 additions & 26 deletions src/common/file_system/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,35 +421,59 @@ void LocalFileSystem::readFromFile(FileInfo& fileInfo, void* buffer, uint64_t nu
uint64_t position) const {
auto localFileInfo = fileInfo.constPtrCast<LocalFileInfo>();
DASSERT(localFileInfo->getFileSize() >= position + numBytes);
auto outputBuffer = static_cast<uint8_t*>(buffer);
uint64_t remainingNumBytesToRead = numBytes;
uint64_t bufferOffset = 0;
// Keep reads below common OS syscall transfer limits.
constexpr uint64_t maxBytesToReadAtOnce = 1ull << 30;
while (remainingNumBytesToRead > 0) {
const auto numBytesToRead = (std::min)(remainingNumBytesToRead, maxBytesToReadAtOnce);
#if defined(_WIN32)
DWORD numBytesRead;
OVERLAPPED overlapped = {};
overlapped.Offset = position & 0xffffffff;
overlapped.OffsetHigh = position >> 32;
if (!ReadFile((HANDLE)localFileInfo->handle, buffer, numBytes, &numBytesRead, &overlapped)) {
auto error = GetLastError();
throw IOException(
std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}. Error {}: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytes, position,
error, std::system_category().message(error)));
}
if (numBytesRead != numBytes && fileInfo.getFileSize() != position + numBytesRead) {
throw IOException(std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytes, position));
}
DWORD numBytesRead;
OVERLAPPED overlapped = {};
overlapped.Offset = position & 0xffffffff;
overlapped.OffsetHigh = position >> 32;
if (!ReadFile((HANDLE)localFileInfo->handle, outputBuffer + bufferOffset, numBytesToRead,
&numBytesRead, &overlapped)) {
auto error = GetLastError();
throw IOException(
std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}. Error {}: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytesToRead,
position, error, std::system_category().message(error)));
}
if (numBytesRead != numBytesToRead && fileInfo.getFileSize() != position + numBytesRead) {
throw IOException(std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytesToRead,
position));
}
#else
auto numBytesRead = pread(localFileInfo->fd, buffer, numBytes, position);
if (static_cast<uint64_t>(numBytesRead) != numBytes &&
localFileInfo->getFileSize() != position + numBytesRead) {
// LCOV_EXCL_START
throw IOException(std::format("Cannot read from file: {} fileDescriptor: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, localFileInfo->fd, numBytesRead, numBytes, position));
// LCOV_EXCL_STOP
}
auto numBytesRead =
pread(localFileInfo->fd, outputBuffer + bufferOffset, numBytesToRead, position);
if (numBytesRead < 0) {
// LCOV_EXCL_START
throw IOException(std::format("Cannot read from file: {} fileDescriptor: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, localFileInfo->fd, numBytesRead, numBytesToRead, position));
// LCOV_EXCL_STOP
}
if (static_cast<uint64_t>(numBytesRead) != numBytesToRead &&
localFileInfo->getFileSize() != position + numBytesRead) {
// LCOV_EXCL_START
throw IOException(std::format("Cannot read from file: {} fileDescriptor: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, localFileInfo->fd, numBytesRead, numBytesToRead, position));
// LCOV_EXCL_STOP
}
#endif
if (numBytesRead == 0) {
break;
}
remainingNumBytesToRead -= numBytesRead;
position += numBytesRead;
bufferOffset += numBytesRead;
}
}

int64_t LocalFileSystem::readFile(FileInfo& fileInfo, void* buf, size_t nbyte) const {
Expand Down
2 changes: 1 addition & 1 deletion src/function/function_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ FunctionCollection* FunctionCollection::getFunctions() {
TABLE_FUNCTION(ShowAttachedDatabasesFunction), TABLE_FUNCTION(ShowSequencesFunction),
TABLE_FUNCTION(ShowFunctionsFunction), TABLE_FUNCTION(BMInfoFunction),
TABLE_FUNCTION(FileInfoFunction), TABLE_FUNCTION(DiskSizeInfoFunction),
TABLE_FUNCTION(ShowLoadedExtensionsFunction),
TABLE_FUNCTION(DiskInfoFunction), TABLE_FUNCTION(ShowLoadedExtensionsFunction),
TABLE_FUNCTION(ShowOfficialExtensionsFunction), TABLE_FUNCTION(ShowIndexesFunction),
TABLE_FUNCTION(ShowProjectedGraphsFunction), TABLE_FUNCTION(ProjectedGraphInfoFunction),
TABLE_FUNCTION(ShowMacrosFunction),
Expand Down
78 changes: 27 additions & 51 deletions src/function/table/disk_size_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,48 +119,6 @@ struct DiskSizeEntry {
uint64_t sizeBytes;
};

// Estimate the number of pages used by a hash index based on the number of entries
// Hash index structure:
// - INDEX_HEADER_PAGES pages for HashIndexHeaderOnDisk (2 pages for 256 sub-indexes)
// - DiskArrayCollection header pages (1+ pages)
// - For each of 256 sub-indexes: pSlots and oSlots disk arrays
// - Each slot is SLOT_CAPACITY_BYTES (256 bytes), so 16 slots per page
// - Number of primary slots = 2^currentLevel + nextSplitSlotId
// - Overflow slots depend on collisions
static uint64_t estimateHashIndexPages(const PrimaryKeyIndex* pkIndex) {
if (!pkIndex) {
return 0;
}

uint64_t totalPages = 0;

// Index header pages (storing HashIndexHeaderOnDisk for all 256 sub-indexes)
totalPages += INDEX_HEADER_PAGES; // 2 pages

// DiskArrayCollection header pages (at least 1)
// Each header page stores headers for up to ~170 disk arrays
// With 256 sub-indexes * 2 arrays (pSlots + oSlots) = 512 arrays
totalPages += 4; // Approximate: ~3-4 header pages for DiskArrayCollection

// For each sub-index, estimate primary and overflow slot pages
// We can access the headers through the pkIndex to get actual sizes
// But since the headers are private, we estimate based on numEntries

// Get total entries from all sub-indexes
// Each entry requires a slot, and slots have capacity of ~3-20 entries depending on key type
// With linear hashing, we expect ~70-80% fill rate

// Rough estimation: For N entries with 8-byte keys:
// - Slot capacity is approximately 3 entries per slot (256-byte slot / 80 bytes per entry)
// - Number of slots ≈ N / (3 * 0.7) ≈ N / 2
// - Pages for slots = slots / 16 (16 slots per page)
// - Plus PIP pages for addressing

// Since we can't easily access internal headers, we return the header overhead
// and let the unaccounted calculation handle the rest
return totalPages;
}

static std::vector<DiskSizeEntry> collectDiskSizeInfo(const ClientContext* context) {
std::vector<DiskSizeEntry> entries;
auto storageManager = StorageManager::Get(*context);
Expand Down Expand Up @@ -203,12 +161,22 @@ static std::vector<DiskSizeEntry> collectDiskSizeInfo(const ClientContext* conte
entries.push_back(
{"node_table", tableEntry->getName(), tablePages, tablePages * LBUG_PAGE_SIZE});

// Count primary key index header pages (rough estimate for overhead)
auto* pkIndex = nodeTable.tryGetPKIndex();
uint64_t indexPages = estimateHashIndexPages(pkIndex);
if (indexPages > 0) {
entries.push_back({"pk_index_overhead", tableEntry->getName() + "_pk", indexPages,
indexPages * LBUG_PAGE_SIZE});
for (const auto& indexHolder : nodeTable.getIndexes()) {
if (!indexHolder.isLoaded()) {
continue;
}
const auto* index = indexHolder.getIndex();
const auto indexInfo = index->getIndexInfo();
for (const auto& storageEntry : index->getStorageEntries()) {
if (storageEntry.pageRange.startPageIdx == INVALID_PAGE_IDX ||
storageEntry.pageRange.numPages == 0) {
continue;
}
entries.push_back({"index",
tableEntry->getName() + "." + indexInfo.name + ":" + storageEntry.component,
storageEntry.pageRange.numPages,
storageEntry.pageRange.numPages * LBUG_PAGE_SIZE});
}
}
}

Expand Down Expand Up @@ -256,15 +224,15 @@ static std::vector<DiskSizeEntry> collectDiskSizeInfo(const ClientContext* conte
}
for (const auto& entry : entries) {
if (entry.category == "node_table" || entry.category == "rel_table" ||
entry.category == "pk_index_overhead") {
entry.category == "index") {
accountedPages += entry.numPages;
}
}
accountedPages += freePages;

if (totalFilePages > accountedPages) {
uint64_t unaccountedPages = totalFilePages - accountedPages;
entries.push_back({"index_data", "hash_index_slots", unaccountedPages,
entries.push_back({"unaccounted", "unaccounted_pages", unaccountedPages,
unaccountedPages * LBUG_PAGE_SIZE});
}

Expand Down Expand Up @@ -308,7 +276,7 @@ static std::unique_ptr<TableFuncBindData> bindFunc(const ClientContext* context,
return std::make_unique<DiskSizeInfoBindData>(columns, entries.size(), context);
}

function_set DiskSizeInfoFunction::getFunctionSet() {
static function_set getDiskSizeInfoFunctionSet(const char* name) {
function_set functionSet;
auto function = std::make_unique<TableFunction>(name, std::vector<LogicalTypeID>{});
function->tableFunc = SimpleTableFunc::getTableFunc(internalTableFunc);
Expand All @@ -319,5 +287,13 @@ function_set DiskSizeInfoFunction::getFunctionSet() {
return functionSet;
}

function_set DiskSizeInfoFunction::getFunctionSet() {
return getDiskSizeInfoFunctionSet(name);
}

function_set DiskInfoFunction::getFunctionSet() {
return getDiskSizeInfoFunctionSet(name);
}

} // namespace function
} // namespace lbug
Loading
Loading