Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,10 @@ CONF_mInt64(lake_publish_version_slow_log_ms, "1000");
CONF_mString(lake_vacuum_retry_pattern, "*request rate*");
CONF_mInt64(lake_vacuum_retry_max_attempts, "5");
CONF_mInt64(lake_vacuum_retry_min_delay_ms, "100");
// Whether vacuum tasks honor the timeout carried in the request (VacuumRequest.timeout_ms)
// and abort themselves once it elapses. Set to false to let vacuum tasks always run to
// completion no matter how long the FE caller waits.
CONF_mBool(lake_vacuum_enable_task_timeout, "true");
CONF_mInt64(lake_max_garbage_version_distance, "100");
CONF_mBool(enable_primary_key_recover, "false");
CONF_mBool(lake_enable_compaction_async_write, "false");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config_fwd_headers_manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@
"lake_vacuum_retry_pattern",
"lake_vacuum_retry_max_attempts",
"lake_vacuum_retry_min_delay_ms",
"lake_vacuum_enable_task_timeout",
"lake_max_garbage_version_distance",
"enable_strict_delvec_crc_check",
"lake_clear_corrupted_cache_meta",
Expand Down
5 changes: 5 additions & 0 deletions be/src/common/config_lake_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ CONF_mInt64(lake_vacuum_retry_max_attempts, "5");

CONF_mInt64(lake_vacuum_retry_min_delay_ms, "100");

// Whether vacuum tasks honor the timeout carried in the request (VacuumRequest.timeout_ms)
// and abort themselves once it elapses. Set to false to let vacuum tasks always run to
// completion no matter how long the FE caller waits.
CONF_mBool(lake_vacuum_enable_task_timeout, "true");

CONF_mInt64(lake_max_garbage_version_distance, "100");

// Enable cleanup of orphan delvec entries during compaction.
Expand Down
13 changes: 12 additions & 1 deletion be/src/service/service_be/lake_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1724,11 +1724,22 @@ void LakeServiceImpl::vacuum(::google::protobuf::RpcController* controller, cons

TEST_SYNC_POINT("LakeServiceImpl::vacuum:2");

// Anchor the deadline at the time the request is received: the FE caller waits at most
// |timeout_ms| from now, so once the deadline passes (whether the task waited in the
// thread pool queue or is in the middle of vacuuming) the task aborts itself instead of
// keeping a vacuum worker occupied for a response nobody reads. Requests without the
// field (older FE versions) carry no deadline and run to completion as before, and
// setting |lake_vacuum_enable_task_timeout| to false disables the deadline entirely.
int64_t deadline_ms = 0;
if (config::lake_vacuum_enable_task_timeout && request->has_timeout_ms() && request->timeout_ms() > 0) {
deadline_ms = butil::gettimeofday_ms() + request->timeout_ms();
}

auto latch = BThreadCountDownLatch(1);
auto task = std::make_shared<CancellableRunnable>(
[&] {
DeferOp defer([&] { latch.count_down(); });
lake::vacuum(_tablet_mgr, *request, response);
lake::vacuum(_tablet_mgr, *request, response, deadline_ms);
},
[&] {
Status st = Status::Cancelled("vacuum task has been cancelled");
Expand Down
42 changes: 34 additions & 8 deletions be/src/storage/lake/vacuum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <butil/fast_rand.h>
#include <butil/time.h>
#include <bvar/bvar.h>
#include <fmt/format.h>

#include <optional>
#include <set>
Expand Down Expand Up @@ -176,6 +177,23 @@ bool should_retry(const Status& st, int64_t attempted_retries) {
return MatchPattern(message, config::lake_vacuum_retry_pattern.value());
}

// Returns Status::TimedOut once |deadline_ms| (milliseconds since the Epoch) has passed.
// deadline_ms <= 0 means no deadline. The deadline is anchored at the time the BE received
// the vacuum request, so it also expires for tasks that waited too long in the thread pool
// queue: by then the FE caller has given up waiting and would re-dispatch the partition,
// continuing would only keep a vacuum worker occupied for a response nobody reads.
Status check_vacuum_deadline(int64_t deadline_ms) {
if (deadline_ms <= 0) {
return Status::OK();
}
int64_t now_ms = butil::gettimeofday_ms();
TEST_SYNC_POINT_CALLBACK("vacuum:check_deadline", &now_ms);
if (now_ms >= deadline_ms) {
return Status::TimedOut(fmt::format("vacuum task deadline exceeded, now={}, deadline={}", now_ms, deadline_ms));
}
return Status::OK();
}

Status delete_files_with_retry(FileSystem* fs, std::span<const std::string> paths) {
const int64_t base = config::lake_vacuum_retry_min_delay_ms;
const int64_t max_retries = config::lake_vacuum_retry_max_attempts;
Expand Down Expand Up @@ -417,7 +435,7 @@ static Status collect_files_to_vacuum(TabletManager* tablet_mgr, std::string_vie
AsyncFileDeleter* datafile_deleter, AsyncFileDeleter* metafile_deleter,
AsyncSharedFileDeleter* shared_file_deleter, int64_t* total_datafile_size,
int64_t* vacuumed_version, int64_t* extra_datafile_size,
const TabletRetainInfo& retain_info) {
const TabletRetainInfo& retain_info, int64_t deadline_ms) {
auto t0 = butil::gettimeofday_ms();
auto meta_dir = join_path(root_dir, kMetadataDirectoryName);
auto data_dir = join_path(root_dir, kSegmentDirectoryName);
Expand All @@ -437,6 +455,9 @@ static Status collect_files_to_vacuum(TabletManager* tablet_mgr, std::string_vie
// Starting at |*final_retain_version|, read the tablet metadata forward along
// the |prev_garbage_version| pointer until the tablet metadata does not exist.
while (version >= min_version) {
if (auto st = check_vacuum_deadline(deadline_ms); !st.ok()) {
return Status::TimedOut(fmt::format("{} tablet_id={}", st.message(), tablet_id));
}
// fill data cache to avoid read bundle meta file from remote storage repeatedly.
auto res = tablet_mgr->get_tablet_metadata(
tablet_id, version, false /* Not need to fill meta cache */,
Expand Down Expand Up @@ -557,7 +578,7 @@ static Status vacuum_tablet_metadata(TabletManager* tablet_mgr, std::string_view
int64_t grace_timestamp, bool enable_file_bundling,
bool enable_shared_file_cleanup, int64_t* vacuumed_files,
int64_t* vacuumed_file_size, int64_t* vacuumed_version, int64_t* extra_file_size,
const std::unordered_set<int64_t>& retain_versions) {
const std::unordered_set<int64_t>& retain_versions, int64_t deadline_ms) {
DCHECK(tablet_mgr != nullptr);
DCHECK(std::is_sorted(tablet_infos.begin(), tablet_infos.end(),
[](const auto& a, const auto& b) { return a.tablet_id() < b.tablet_id(); }));
Expand Down Expand Up @@ -586,7 +607,7 @@ static Status vacuum_tablet_metadata(TabletManager* tablet_mgr, std::string_view
RETURN_IF_ERROR(collect_files_to_vacuum(tablet_mgr, root_dir, tablet_info, grace_timestamp, min_retain_version,
vacuum_version_range.get(), &datafile_deleter, &metafile_deleter,
&shared_file_deleter, vacuumed_file_size, &tablet_vacuumed_version,
extra_file_size, tablet_retain_info));
extra_file_size, tablet_retain_info, deadline_ms));
RETURN_IF_ERROR(datafile_deleter.finish());
(*vacuumed_files) += datafile_deleter.delete_count();
if (!enable_file_bundling) {
Expand Down Expand Up @@ -795,7 +816,8 @@ Status vacuum_load_spill(std::string_view root_location, int64_t min_active_txn_
return ret;
}

Status vacuum_impl(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumResponse* response) {
Status vacuum_impl(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumResponse* response,
int64_t deadline_ms) {
if (UNLIKELY(tablet_mgr == nullptr)) {
return Status::InvalidArgument("tablet_mgr is null");
}
Expand All @@ -808,6 +830,9 @@ Status vacuum_impl(TabletManager* tablet_mgr, const VacuumRequest& request, Vacu
if (UNLIKELY(request.grace_timestamp() <= 0)) {
return Status::InvalidArgument("value of grace_timestamp is zero or nagative");
}
// The task may have stayed in the thread pool queue long enough that the FE caller
// already timed out and gave up, abort without doing any work in that case.
RETURN_IF_ERROR(check_vacuum_deadline(deadline_ms));

auto tablet_infos = std::vector<TabletInfoPB>();
if (request.tablet_infos_size() > 0) {
Expand Down Expand Up @@ -852,7 +877,8 @@ Status vacuum_impl(TabletManager* tablet_mgr, const VacuumRequest& request, Vacu
request.has_enable_shared_file_cleanup() ? request.enable_shared_file_cleanup() : enable_file_bundling;
RETURN_IF_ERROR(vacuum_tablet_metadata(tablet_mgr, root_loc, tablet_infos, min_retain_version, grace_timestamp,
enable_file_bundling, enable_shared_file_cleanup, &vacuumed_files,
&vacuumed_file_size, &vacuumed_version, &extra_file_size, retain_versions));
&vacuumed_file_size, &vacuumed_version, &extra_file_size, retain_versions,
deadline_ms));
Comment thread
starrocks-xupeng marked this conversation as resolved.
extra_file_size -= vacuumed_file_size;
if (request.delete_txn_log()) {
RETURN_IF_ERROR(vacuum_txn_log(root_loc, min_active_txn_id, &vacuumed_files, &vacuumed_file_size));
Expand All @@ -871,9 +897,9 @@ Status vacuum_impl(TabletManager* tablet_mgr, const VacuumRequest& request, Vacu
return Status::OK();
}

void vacuum(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumResponse* response) {
auto st = vacuum_impl(tablet_mgr, request, response);
LOG_IF(ERROR, !st.ok()) << st;
void vacuum(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumResponse* response, int64_t deadline_ms) {
auto st = vacuum_impl(tablet_mgr, request, response, deadline_ms);
LOG_IF(ERROR, !st.ok()) << "Fail to vacuum partition " << request.partition_id() << ": " << st;
st.to_protobuf(response->mutable_status());
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/vacuum.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace starrocks::lake {

class TabletManager;

void vacuum(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumResponse* response);
void vacuum(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumResponse* response, int64_t deadline_ms = 0);

// REQUIRES:
// - tablet_mgr != NULL
Expand Down
62 changes: 62 additions & 0 deletions be/test/service/lake_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4107,6 +4107,68 @@ TEST_F(LakeServiceTest, test_duplicated_vacuum_request) {
ASSERT_TRUE(duplicate);
}

TEST_F(LakeServiceTest, test_vacuum_task_deadline_exceeded) {
// Make every deadline check observe a clock far past the deadline. The callback only
// fires when the handler threads a positive deadline into the vacuum task, so this also
// guards against regressions where the handler stops passing the deadline down.
SyncPoint::GetInstance()->SetCallBack("vacuum:check_deadline",
[](void* arg) { *(int64_t*)arg = int64_t{1} << 62; });
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([]() {
SyncPoint::GetInstance()->ClearCallBack("vacuum:check_deadline");
SyncPoint::GetInstance()->DisableProcessing();
});

{
// A request carrying timeout_ms aborts with TIMEOUT once the deadline passes.
brpc::Controller cntl;
VacuumRequest request;
VacuumResponse response;
request.add_tablet_ids(_tablet_id);
request.set_partition_id(next_id());
request.set_min_retain_version(1);
request.set_grace_timestamp(::time(nullptr));
request.set_timeout_ms(60 * 60 * 1000L);
_lake_service.vacuum(&cntl, &request, &response, nullptr);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(TStatusCode::TIMEOUT, response.status().status_code()) << response.status().status_code();
}

{
// A request without timeout_ms (older FE versions) carries no deadline: even with the
// mocked clock the task runs to completion as before.
brpc::Controller cntl;
VacuumRequest request;
VacuumResponse response;
request.add_tablet_ids(_tablet_id);
request.set_partition_id(next_id());
request.set_min_retain_version(1);
request.set_grace_timestamp(::time(nullptr));
_lake_service.vacuum(&cntl, &request, &response, nullptr);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(0, response.status().status_code()) << response.status().status_code();
}

{
// Setting lake_vacuum_enable_task_timeout to false disables the deadline: a request
// carrying timeout_ms still runs to completion.
bool old_value = config::lake_vacuum_enable_task_timeout;
config::lake_vacuum_enable_task_timeout = false;
DeferOp restore_config([old_value] { config::lake_vacuum_enable_task_timeout = old_value; });
brpc::Controller cntl;
VacuumRequest request;
VacuumResponse response;
request.add_tablet_ids(_tablet_id);
request.set_partition_id(next_id());
request.set_min_retain_version(1);
request.set_grace_timestamp(::time(nullptr));
request.set_timeout_ms(60 * 60 * 1000L);
_lake_service.vacuum(&cntl, &request, &response, nullptr);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(0, response.status().status_code()) << response.status().status_code();
}
}

TEST_F(LakeServiceTest, test_lock_and_unlock_tablet_metadata) {
{
LockTabletMetadataRequest request;
Expand Down
79 changes: 79 additions & 0 deletions be/test/storage/lake/vacuum_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3657,6 +3657,85 @@ TEST_P(LakeVacuumTest, full_vacuum_drops_local_cache_for_active_idx) {
EXPECT_NE(std::find(dropped.begin(), dropped.end(), std::string("8003_active.idx")), dropped.end());
}

// A deadline that expires while walking the version chain must stop the walk early without
// deleting anything; the next run (no deadline pressure) completes normally.
// NOLINTNEXTLINE
TEST_P(LakeVacuumTest, test_vacuum_deadline_expired_mid_walk) {
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 20002,
"version": 2,
"prev_garbage_version": 0,
"commit_time": 1687331159
}
)DEL")));
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 20002,
"version": 3,
"prev_garbage_version": 2,
"commit_time": 1687331160
}
)DEL")));
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 20002,
"version": 4,
"prev_garbage_version": 3,
"commit_time": 1687331161
}
)DEL")));

// The first checks (request entry, first two walk iterations) observe a mocked clock
// before the deadline, every later check observes one far past it, so the deadline
// expires in the middle of the version chain walk.
int64_t check_count = 0;
SyncPoint::GetInstance()->SetCallBack("vacuum:check_deadline", [&](void* arg) {
check_count++;
*(int64_t*)arg = (check_count > 3) ? (int64_t{1} << 62) : 0;
});
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([]() {
SyncPoint::GetInstance()->ClearCallBack("vacuum:check_deadline");
SyncPoint::GetInstance()->DisableProcessing();
});

{
VacuumRequest request;
VacuumResponse response;
request.add_tablet_ids(20002);
request.set_min_retain_version(4);
request.set_grace_timestamp(1687331162);
request.set_min_active_txn_id(12344);
vacuum(_tablet_mgr.get(), request, &response, /*deadline_ms=*/1);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(TStatusCode::TIMEOUT, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_GT(check_count, 3);
EXPECT_EQ(0, response.vacuumed_files());
EXPECT_TRUE(file_exist(tablet_metadata_filename(20002, 2)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(20002, 3)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(20002, 4)));
}

SyncPoint::GetInstance()->ClearCallBack("vacuum:check_deadline");

{
VacuumRequest request;
VacuumResponse response;
request.add_tablet_ids(20002);
request.set_min_retain_version(4);
request.set_grace_timestamp(1687331162);
request.set_min_active_txn_id(12344);
vacuum(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(4, response.vacuumed_version());
EXPECT_FALSE(file_exist(tablet_metadata_filename(20002, 2)));
EXPECT_FALSE(file_exist(tablet_metadata_filename(20002, 3)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(20002, 4)));
}
}

INSTANTIATE_TEST_SUITE_P(LakeVacuumTest, LakeVacuumTest,
::testing::Values(VacuumTestArg{1}, VacuumTestArg{3}, VacuumTestArg{100}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ private void vacuumPartitionImpl(Database db, OlapTable table, PhysicalPartition
vacuumRequest.deleteTxnLog = needDeleteTxnLog;
vacuumRequest.enableFileBundling = fileBundling;
vacuumRequest.enableSharedFileCleanup = enableSharedFileCleanup;
// The longest this FE waits for the response (the brpc timeout of the vacuum RPC).
// The BE checks it periodically during execution and aborts the task once it has
// elapsed, instead of running on as a zombie that no caller is waiting for.
vacuumRequest.timeoutMs = LakeService.TIMEOUT_VACUUM;
Comment on lines +321 to +324

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to make vacuum timeout configurable

// Perform deletion of txn log on the first node only.
needDeleteTxnLog = false;
try {
Expand Down
Loading
Loading