Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/binder/bind/bind_ddl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "parser/ddl/drop.h"
#include "parser/expression/parsed_function_expression.h"
#include "parser/expression/parsed_literal_expression.h"
#include "storage/index/art_index.h"
#include "storage/index/hash_index.h"
#include "storage/storage_manager.h"
#include "transaction/transaction.h"
Expand Down Expand Up @@ -422,8 +423,11 @@ std::unique_ptr<BoundStatement> Binder::bindCreateIndex(const Statement& stateme
if (!nodeTableEntry->getStorage().empty()) {
throw BinderException("CREATE INDEX is only supported on native node tables.");
}
if (!StringUtils::caseInsensitiveEquals(nodeTableEntry->getPrimaryKeyName(),
info.propertyName)) {
const auto isPrimaryIndex =
StringUtils::caseInsensitiveEquals(nodeTableEntry->getPrimaryKeyName(), info.propertyName);
const auto isArtIndex = StringUtils::caseInsensitiveEquals(indexType,
storage::ArtPrimaryKeyIndex::getIndexType().typeName);
if (!isPrimaryIndex && !isArtIndex) {
throw BinderException(std::format(
"{} indexes are currently supported only on node primary keys.", indexType));
}
Expand All @@ -434,7 +438,9 @@ std::unique_ptr<BoundStatement> Binder::bindCreateIndex(const Statement& stateme
auto& property = tableEntry->getProperty(info.propertyName);
std::vector<PropertyDefinition> propertyDefinitions;
propertyDefinitions.push_back(property.copy());
validatePrimaryKey(property.getName(), propertyDefinitions);
if (isPrimaryIndex) {
validatePrimaryKey(property.getName(), propertyDefinitions);
}
auto indexName = info.indexName.empty() ? std::string(storage::PrimaryKeyIndex::DEFAULT_NAME) :
info.indexName;
if (info.onConflict == ConflictAction::ON_CONFLICT_THROW) {
Expand All @@ -454,7 +460,7 @@ std::unique_ptr<BoundStatement> Binder::bindCreateIndex(const Statement& stateme
BoundCreateIndexInfo boundInfo{indexType, std::move(indexName), info.tableName,
tableEntry->getTableID(), property.getName(), tableEntry->getPropertyID(property.getName()),
tableEntry->getColumnID(property.getName()), property.getType().getPhysicalType(),
info.onConflict};
isPrimaryIndex, info.onConflict};
return std::make_unique<BoundCreateIndex>(std::move(boundInfo));
}

Expand Down
76 changes: 50 additions & 26 deletions src/common/file_system/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,35 +421,59 @@ void LocalFileSystem::readFromFile(FileInfo& fileInfo, void* buffer, uint64_t nu
uint64_t position) const {
auto localFileInfo = fileInfo.constPtrCast<LocalFileInfo>();
DASSERT(localFileInfo->getFileSize() >= position + numBytes);
auto outputBuffer = static_cast<uint8_t*>(buffer);
uint64_t remainingNumBytesToRead = numBytes;
uint64_t bufferOffset = 0;
// Keep reads below common OS syscall transfer limits.
constexpr uint64_t maxBytesToReadAtOnce = 1ull << 30;
while (remainingNumBytesToRead > 0) {
const auto numBytesToRead = (std::min)(remainingNumBytesToRead, maxBytesToReadAtOnce);
#if defined(_WIN32)
DWORD numBytesRead;
OVERLAPPED overlapped = {};
overlapped.Offset = position & 0xffffffff;
overlapped.OffsetHigh = position >> 32;
if (!ReadFile((HANDLE)localFileInfo->handle, buffer, numBytes, &numBytesRead, &overlapped)) {
auto error = GetLastError();
throw IOException(
std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}. Error {}: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytes, position,
error, std::system_category().message(error)));
}
if (numBytesRead != numBytes && fileInfo.getFileSize() != position + numBytesRead) {
throw IOException(std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytes, position));
}
DWORD numBytesRead;
OVERLAPPED overlapped = {};
overlapped.Offset = position & 0xffffffff;
overlapped.OffsetHigh = position >> 32;
if (!ReadFile((HANDLE)localFileInfo->handle, outputBuffer + bufferOffset, numBytesToRead,
&numBytesRead, &overlapped)) {
auto error = GetLastError();
throw IOException(
std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}. Error {}: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytesToRead,
position, error, std::system_category().message(error)));
}
if (numBytesRead != numBytesToRead && fileInfo.getFileSize() != position + numBytesRead) {
throw IOException(std::format("Cannot read from file: {} handle: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, (intptr_t)localFileInfo->handle, numBytesRead, numBytesToRead,
position));
}
#else
auto numBytesRead = pread(localFileInfo->fd, buffer, numBytes, position);
if (static_cast<uint64_t>(numBytesRead) != numBytes &&
localFileInfo->getFileSize() != position + numBytesRead) {
// LCOV_EXCL_START
throw IOException(std::format("Cannot read from file: {} fileDescriptor: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, localFileInfo->fd, numBytesRead, numBytes, position));
// LCOV_EXCL_STOP
}
auto numBytesRead =
pread(localFileInfo->fd, outputBuffer + bufferOffset, numBytesToRead, position);
if (numBytesRead < 0) {
// LCOV_EXCL_START
throw IOException(std::format("Cannot read from file: {} fileDescriptor: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, localFileInfo->fd, numBytesRead, numBytesToRead, position));
// LCOV_EXCL_STOP
}
if (static_cast<uint64_t>(numBytesRead) != numBytesToRead &&
localFileInfo->getFileSize() != position + numBytesRead) {
// LCOV_EXCL_START
throw IOException(std::format("Cannot read from file: {} fileDescriptor: {} "
"numBytesRead: {} numBytesToRead: {} position: {}",
fileInfo.path, localFileInfo->fd, numBytesRead, numBytesToRead, position));
// LCOV_EXCL_STOP
}
#endif
if (numBytesRead == 0) {
break;
}
remainingNumBytesToRead -= numBytesRead;
position += numBytesRead;
bufferOffset += numBytesRead;
}
}

int64_t LocalFileSystem::readFile(FileInfo& fileInfo, void* buf, size_t nbyte) const {
Expand Down
1 change: 1 addition & 0 deletions src/include/binder/ddl/bound_create_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct BoundCreateIndexInfo {
common::property_id_t propertyID;
common::column_id_t columnID;
common::PhysicalTypeID keyDataType;
bool isPrimary;
common::ConflictAction onConflict;

BoundCreateIndexInfo copy() const { return *this; }
Expand Down
13 changes: 13 additions & 0 deletions src/include/planner/operator/scan/logical_scan_node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace planner {
enum class LogicalScanNodeTableType : uint8_t {
SCAN = 0,
PRIMARY_KEY_SCAN = 1,
SECONDARY_INDEX_SCAN = 2,
};

struct ExtraScanNodeTableInfo {
Expand Down Expand Up @@ -45,6 +46,18 @@ struct PrimaryKeyScanInfo final : ExtraScanNodeTableInfo {
}
};

struct SecondaryIndexScanInfo final : ExtraScanNodeTableInfo {
std::string indexName;
std::shared_ptr<binder::Expression> key;

SecondaryIndexScanInfo(std::string indexName, std::shared_ptr<binder::Expression> key)
: indexName{std::move(indexName)}, key{std::move(key)} {}

std::unique_ptr<ExtraScanNodeTableInfo> copy() const override {
return std::make_unique<SecondaryIndexScanInfo>(indexName, key);
}
};

struct LogicalScanNodeTablePrintInfo final : OPPrintInfo {
std::shared_ptr<binder::Expression> nodeID;
binder::expression_vector properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ class PrimaryKeyScanNodeTable : public ScanTable {
PrimaryKeyScanNodeTable(ScanOpInfo opInfo, std::vector<ScanNodeTableInfo> tableInfos,
std::unique_ptr<evaluator::ExpressionEvaluator> indexEvaluator,
std::unique_ptr<evaluator::ExpressionEvaluator> upperBoundEvaluator, bool isRange,
bool lowerInclusive, bool upperInclusive,
bool isIndexEquality, bool lowerInclusive, bool upperInclusive, std::string indexName,
std::shared_ptr<PrimaryKeyScanSharedState> sharedState, physical_op_id id,
std::unique_ptr<OPPrintInfo> printInfo)
: ScanTable{type_, std::move(opInfo), id, std::move(printInfo)}, scanState{nullptr},
tableInfos{std::move(tableInfos)}, indexEvaluator{std::move(indexEvaluator)},
upperBoundEvaluator{std::move(upperBoundEvaluator)}, sharedState{std::move(sharedState)},
isRange{isRange}, lowerInclusive{lowerInclusive}, upperInclusive{upperInclusive},
currentRangeTableIdx{0}, rangeOffsetCursor{0} {}
isRange{isRange}, isIndexEquality{isIndexEquality}, lowerInclusive{lowerInclusive},
upperInclusive{upperInclusive}, indexName{std::move(indexName)}, currentRangeTableIdx{0},
rangeOffsetCursor{0} {}

bool isSource() const override { return true; }

Expand All @@ -68,7 +69,8 @@ class PrimaryKeyScanNodeTable : public ScanTable {
return std::make_unique<PrimaryKeyScanNodeTable>(opInfo.copy(), copyVector(tableInfos),
indexEvaluator == nullptr ? nullptr : indexEvaluator->copy(),
upperBoundEvaluator == nullptr ? nullptr : upperBoundEvaluator->copy(), isRange,
lowerInclusive, upperInclusive, sharedState, id, printInfo->copy());
isIndexEquality, lowerInclusive, upperInclusive, indexName, sharedState, id,
printInfo->copy());
}

private:
Expand All @@ -81,8 +83,10 @@ class PrimaryKeyScanNodeTable : public ScanTable {
std::unique_ptr<evaluator::ExpressionEvaluator> upperBoundEvaluator;
std::shared_ptr<PrimaryKeyScanSharedState> sharedState;
bool isRange;
bool isIndexEquality;
bool lowerInclusive;
bool upperInclusive;
std::string indexName;
common::idx_t currentRangeTableIdx;
std::vector<common::offset_t> rangeOffsets;
common::idx_t rangeOffsetCursor;
Expand Down
64 changes: 54 additions & 10 deletions src/include/storage/index/art_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
#include "common/types/string_t.h"
#include "common/types/uint128_t.h"
#include "storage/index/index.h"
#include "storage/page_range.h"

namespace lbug {
namespace common {
struct BufferReader;
}
namespace storage {

class FileHandle;
class ShadowFile;

class ArtKey {
public:
ArtKey() = default;
Expand All @@ -34,11 +38,15 @@ class ArtKey {

struct ArtPrimaryKeyIndexStorageInfo final : IndexStorageInfo {
std::vector<std::pair<std::vector<uint8_t>, common::offset_t>> entries;
PageRange treePageRange;
uint64_t treeSize = 0;

ArtPrimaryKeyIndexStorageInfo() = default;
explicit ArtPrimaryKeyIndexStorageInfo(
std::vector<std::pair<std::vector<uint8_t>, common::offset_t>> entries)
: entries{std::move(entries)} {}
ArtPrimaryKeyIndexStorageInfo(PageRange treePageRange, uint64_t treeSize)
: treePageRange{treePageRange}, treeSize{treeSize} {}
DELETE_COPY_DEFAULT_MOVE(ArtPrimaryKeyIndexStorageInfo);

std::shared_ptr<common::BufferWriter> serialize() const override;
Expand Down Expand Up @@ -67,25 +75,34 @@ class ArtPrimaryKeyIndex final : public Index {
const common::ValueVector& nodeIDVector,
const std::vector<common::ValueVector*>& indexVectors,
Index::InsertState& insertState) override;
std::unique_ptr<UpdateState> initUpdateState(main::ClientContext* context,
common::column_id_t columnID, visible_func isVisible) override;
void update(transaction::Transaction* transaction, const common::ValueVector& nodeIDVector,
common::ValueVector& propertyVector, UpdateState& updateState) override;

std::unique_ptr<DeleteState> initDeleteState(const transaction::Transaction*, MemoryManager*,
visible_func) override {
return std::make_unique<DeleteState>();
}
void delete_(transaction::Transaction*, const common::ValueVector&, DeleteState&) override {
// Visibility rules filter deleted rows. Physical removal is used only for rollback cleanup.
}
void delete_(transaction::Transaction*, const common::ValueVector& nodeIDVector,
DeleteState&) override;

bool lookupPrimaryKey(const transaction::Transaction* transaction,
common::ValueVector* keyVector, uint64_t vectorPos, common::offset_t& result,
visible_func isVisible) override;
bool lookupAll(const transaction::Transaction* transaction, common::ValueVector* keyVector,
uint64_t vectorPos, std::vector<common::offset_t>& results,
visible_func isVisible) override;
bool scanPrimaryKeyRange(common::ValueVector* lowerBoundVector, uint64_t lowerBoundPos,
bool lowerInclusive, common::ValueVector* upperBoundVector, uint64_t upperBoundPos,
bool upperInclusive, common::idx_t maxResults, std::vector<common::offset_t>& results,
visible_func isVisible) override;
void discardPrimaryKey(common::ValueVector* keyVector) override;

void checkpoint(main::ClientContext*, PageAllocator&) override;
void checkpoint(main::ClientContext*, PageAllocator&, ShadowFile&) override;
void rollbackCheckpoint() override;
void serialize(common::Serializer& ser) const override;
void reclaimStorage(PageAllocator& pageAllocator) const override;

static LBUG_API std::unique_ptr<Index> load(main::ClientContext* context,
StorageManager* storageManager, IndexInfo indexInfo, std::span<uint8_t> storageInfoBuffer);
Expand Down Expand Up @@ -114,19 +131,26 @@ class ArtPrimaryKeyIndex final : public Index {
};

std::optional<common::offset_t> offset;
std::unique_ptr<std::vector<common::offset_t>> overflowOffsets;
std::vector<uint8_t> prefix;
Kind kind = Kind::NODE4;
uint16_t count = 0;
union {
SmallChildren small;
Node48Children node48;
Node256Children node256;
};
SmallChildren small;
std::unique_ptr<Node48Children> node48;
std::unique_ptr<Node256Children> node256;

Node();
Node* getChild(uint8_t byte) const;
Node* getOrInsertChild(ArtPrimaryKeyIndex& index, uint8_t byte);
void insertChild(ArtPrimaryKeyIndex& index, uint8_t byte, Node* child);
void removeChild(uint8_t byte);
bool empty() const { return !offset.has_value() && count == 0; }
bool hasOffsets() const {
return offset.has_value() || (overflowOffsets && !overflowOffsets->empty());
}
bool empty() const {
return !offset.has_value() && (!overflowOffsets || overflowOffsets->empty()) &&
count == 0;
}
};

static constexpr uint64_t NODE_BLOCK_CAPACITY = 16 * 1024;
Expand All @@ -144,25 +168,45 @@ class ArtPrimaryKeyIndex final : public Index {
};

bool insertInternal(const ArtKey& key, common::offset_t offset, visible_func isVisible);
void insertSecondaryInternal(const ArtKey& key, common::offset_t offset);
Node* findOrCreateLeaf(const std::vector<uint8_t>& key);
bool lookup(const ArtKey& key, common::offset_t& result, visible_func isVisible) const;
const Node* findLeaf(const ArtKey& key) const;
void appendVisibleOffsets(const Node& node, std::vector<common::offset_t>& results,
visible_func isVisible) const;
bool eraseInternal(Node& node, const std::vector<uint8_t>& key, uint64_t depth);
void erase(const ArtKey& key);
static void eraseOffsetFromLeaf(Node& node, common::offset_t offset);
static void resetNodePayload(Node& node);
bool eraseOffsetInternal(Node& node, common::offset_t offset);
Node* allocateNode();
void recordKindChange(Node& node, Node::Kind newKind);
void collectRange(const Node& node, std::vector<uint8_t>& key, const ArtKey* lowerBound,
bool lowerInclusive, const ArtKey* upperBound, bool upperInclusive,
common::idx_t maxResults, std::vector<common::offset_t>& results,
visible_func isVisible) const;
void clear();
uint64_t calculateSerializedTreeSize(const Node& node) const;
void serializeTree(const Node& node, common::Serializer& serializer) const;
template<class READER>
void loadTree(READER& reader, Node& node);
void collectEntries(const Node& node, std::vector<uint8_t>& key,
std::vector<std::pair<std::vector<uint8_t>, common::offset_t>>& entries) const;
void loadEntries(const ArtPrimaryKeyIndexStorageInfo& storageInfo);
void materializeDiskTree();

private:
Node root;
std::vector<NodeBlock> nodeBlocks;
uint64_t numAllocatedNodes = 1;
std::array<uint64_t, 4> numNodesByKind{1, 0, 0, 0};
FileHandle* diskFileHandle = nullptr;
PageRange diskTreePageRange;
uint64_t diskTreeSize = 0;
bool diskBacked = false;
PageRange checkpointRollbackTreePageRange;
uint64_t checkpointRollbackTreeSize = 0;
bool hasCheckpointRollbackState = false;
mutable std::mutex mutex;
};

Expand Down
5 changes: 3 additions & 2 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ class PrimaryKeyIndex final : public Index {
void discardPrimaryKey(common::ValueVector* keyVector) override;

void checkpointInMemory() override;
void checkpoint(main::ClientContext*, storage::PageAllocator& pageAllocator) override;
void checkpoint(main::ClientContext*, storage::PageAllocator& pageAllocator,
ShadowFile&) override;
OverflowFile* getOverflowFile() const { return overflowFile.get(); }

void rollbackCheckpoint() override;
Expand All @@ -470,7 +471,7 @@ class PrimaryKeyIndex final : public Index {
DASSERT(indexInfo.keyDataTypes.size() == 1);
return indexInfo.keyDataTypes[0];
}
void reclaimStorage(PageAllocator& pageAllocator) const;
void reclaimStorage(PageAllocator& pageAllocator) const override;

static LBUG_API std::unique_ptr<Index> load(main::ClientContext* context,
StorageManager* storageManager, IndexInfo indexInfo, std::span<uint8_t> storageInfoBuffer);
Expand Down
Loading
Loading