Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,7 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
idx, old_entry->get_term() );

} else if (old_entry->get_val_type() == log_val_type::conf) {
buf->pos(0);
ptr<cluster_config> conf_to_rollback =
cluster_config::deserialize(*buf);
state_machine_->rollback_config(idx, conf_to_rollback);
Expand Down
152 changes: 113 additions & 39 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ limitations under the License.
#include "state_mgr.hxx"
#include "tracer.hxx"

#include <algorithm>
#include <cassert>
#include <list>
#include <sstream>
#include <random>
#include <sstream>
#include <stdexcept>

namespace nuraft {

Expand Down Expand Up @@ -448,6 +450,7 @@ void raft_server::commit_conf(ulong idx_to_commit,
} else {
p_in( "skipped config %" PRIu64 ", latest config %" PRIu64 "",
new_conf->get_log_idx(), cur_conf->get_log_idx() );
update_target_priority();
}

cb_func::Param param(id_, leader_);
Expand Down Expand Up @@ -494,6 +497,78 @@ bool raft_server::apply_config_log_entry(ptr<log_entry>& le,
return true;
}

static bool is_valid_config_at_or_before(const ptr<cluster_config>& conf,
ulong log_idx) {
return conf &&
!conf->get_servers().empty() &&
conf->get_log_idx() <= log_idx;
}

static ptr<cluster_config>
find_config_at_or_before(const ptr<cluster_config>& current_conf,
const ptr<snapshot>& last_snp,
const ptr<log_store>& log_store,
ulong log_idx,
std::string& err_msg) {
if (is_valid_config_at_or_before(current_conf, log_idx)) {
return current_conf;
}

if (!log_store) {
err_msg = "log store is null";
return nullptr;
}

const ulong start_idx = log_store->start_index();
const ulong next_slot = log_store->next_slot();
const ulong last_log_idx = next_slot > 0 ? next_slot - 1 : 0;
const ulong end_idx = std::min(log_idx, last_log_idx);

if (end_idx >= start_idx) {
for (ulong idx = end_idx;; --idx) {
if (log_store->is_conf(idx)) {
ptr<log_entry> le = log_store->entry_at(idx);
if (!le.get()) {
err_msg = "config log entry is null";
return nullptr;
}
if (le->is_buf_null()) {
err_msg = "config log entry buffer is null";
return nullptr;
}

le->get_buf().pos(0);
ptr<cluster_config> candidate =
cluster_config::deserialize(le->get_buf());
if (is_valid_config_at_or_before(candidate, log_idx)) {
return candidate;
}
}

if (idx == start_idx) {
break;
}
}
}

if (last_snp &&
last_snp->get_last_log_idx() <= log_idx &&
is_valid_config_at_or_before(last_snp->get_last_config(), log_idx)) {
return last_snp->get_last_config();
}

std::ostringstream ss;
ss << "no valid config at or before cutoff " << log_idx
<< ", log_store start_index " << start_idx
<< ", next_slot " << next_slot
<< ", current config index "
<< (current_conf ? current_conf->get_log_idx() : 0)
<< ", snapshot index "
<< (last_snp ? last_snp->get_last_log_idx() : 0);
err_msg = ss.str();
return nullptr;
}

ulong raft_server::create_snapshot(const create_snapshot_options& options) {
auto exec_internal = [&]() {
uint64_t committed_idx = sm_commit_index_;
Expand Down Expand Up @@ -531,11 +606,6 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation

// get the latest configuration info
ptr<cluster_config> conf = get_config();
if ( conf->get_prev_log_idx() >= log_store_->next_slot() ) {
// The latest config and previous config is not in log_store,
// so skip the snapshot creation.
return false;
}

auto snapshot_distance = (ulong)params->snapshot_distance_;
// Randomized snapshot distance for the first creation.
Expand Down Expand Up @@ -623,41 +693,46 @@ bool raft_server::snapshot_and_compact(ulong committed_idx, bool forced_creation
p_in("snapshot creation is scheduled by user");
}

while ( conf->get_log_idx() > committed_idx &&
conf->get_prev_log_idx() >= log_store_->start_index() ) {
ptr<log_entry> conf_log
( log_store_->entry_at( conf->get_prev_log_idx() ) );
conf = cluster_config::deserialize(conf_log->get_buf());
}
auto fail_snapshot_creation =
[&](const std::string& err_msg) {
if (snapshot_in_action) {
bool val = true;
snp_in_progress_.compare_exchange_strong(val, false);
}
if (manual_creation_cb) {
ptr<std::exception> err =
cs_new<std::runtime_error>(err_msg);
manual_creation_cb->set_result(committed_idx, err,
cmd_result_code::FAILED);
sched_snp_creation_result_.reset();
snp_creation_scheduled_ = false;
}
};

if ( conf->get_log_idx() > committed_idx &&
conf->get_prev_log_idx() > 0 &&
conf->get_prev_log_idx() < log_store_->start_index() ) {
if (!local_snp) {
// LCOV_EXCL_START
p_er("No snapshot could be found while no configuration "
"cannot be found in current committed logs, "
"this is a system error, exiting");
ctx_->state_mgr_->system_exit(raft_err::N6_no_snapshot_found);
_sys_exit(-1);
if (!is_valid_config_at_or_before(conf, committed_idx)) {
std::string err_msg;
ptr<cluster_config> validated_conf =
find_config_at_or_before(conf, local_snp, log_store_,
committed_idx, err_msg);
if (!validated_conf) {
p_er("failed to find a valid config at or before committed index "
"%" PRIu64 " while creating snapshot: %s",
committed_idx, err_msg.c_str());
fail_snapshot_creation("no valid config for snapshot at committed idx");
return false;
// LCOV_EXCL_STOP
}
conf = local_snp->get_last_config();

} else if ( conf->get_log_idx() > committed_idx &&
conf->get_prev_log_idx() == 0 ) {
// Modified by Jung-Sang Ahn in May, 2018:
// Since we remove configure from state machine
// (necessary when we clone a node to another node),
// config at log idx 1 may not be visiable in some condition.
p_wn("config at log idx 1 is not availabe, "
"config log idx %" PRIu64 ", prev log idx %" PRIu64
", committed idx %" PRIu64,
conf->get_log_idx(), conf->get_prev_log_idx(), committed_idx);
//ctx_->state_mgr_->system_exit(raft_err::N7_no_config_at_idx_one);
//_sys_exit(-1);
//return;
p_in("snapshot creation using committed config idx %" PRIu64
" instead of current config idx %" PRIu64,
validated_conf->get_log_idx(),
conf ? conf->get_log_idx() : 0);
conf = validated_conf;
}

if ( conf->get_prev_log_idx() >= log_store_->next_slot() ) {
// The latest config and previous config is not in log_store,
// so skip the snapshot creation.
fail_snapshot_creation("snapshot config is not available in log store");
return false;
}

ulong log_term_to_compact = log_store_->term_at(committed_idx);
Expand Down Expand Up @@ -1064,4 +1139,3 @@ bool raft_server::wait_for_state_machine_pause(size_t timeout_ms) {
}

} // namespace nuraft;

56 changes: 42 additions & 14 deletions src/handle_snapshot_sync.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ limitations under the License.

namespace nuraft {

static bool is_valid_snapshot_config(const ptr<cluster_config>& conf,
ulong snapshot_last_log_idx) {
return conf &&
!conf->get_servers().empty() &&
conf->get_log_idx() <= snapshot_last_log_idx;
}

int32 raft_server::get_snapshot_sync_block_size() const {
int32 block_size = ctx_->get_params()->snapshot_block_size_;
return block_size == 0 ? default_snapshot_sync_block_size : block_size;
Expand Down Expand Up @@ -595,19 +602,41 @@ bool raft_server::handle_snapshot_sync_req(snapshot_sync_req& req, std::unique_l

auto snap_conf = req.get_snapshot().get_last_config();
ptr<cluster_config> c_conf = get_config();
if (snap_conf->get_log_idx() > c_conf->get_log_idx()) {
ctx_->state_mgr_->save_config(*snap_conf);
reconfigure(snap_conf);
c_conf = get_config();
} else {
p_in("snapshot config idx %" PRIu64 " prev idx %" PRIu64
" is not newer than "
"current config idx %" PRIu64 " prev idx %" PRIu64
ptr<cluster_config> snapshot_meta_conf;
const ulong snapshot_last_log_idx = req.get_snapshot().get_last_log_idx();
if (!is_valid_snapshot_config(snap_conf, snapshot_last_log_idx)) {
p_wn("snapshot config idx %" PRIu64 " server count %" PRIu64
" is invalid for snapshot idx %" PRIu64
", will not apply it",
snap_conf->get_log_idx(),
snap_conf->get_prev_log_idx(),
c_conf->get_log_idx(),
c_conf->get_prev_log_idx());
snap_conf ? snap_conf->get_log_idx() : 0,
snap_conf ? (ulong)snap_conf->get_servers().size() : 0,
snapshot_last_log_idx);
} else {
snapshot_meta_conf = snap_conf;
if (!c_conf || snap_conf->get_log_idx() > c_conf->get_log_idx()) {
ctx_->state_mgr_->save_config(*snap_conf);
reconfigure(snap_conf);
c_conf = get_config();
} else {
p_in("snapshot config idx %" PRIu64 " prev idx %" PRIu64
" is not newer than "
"current config idx %" PRIu64 " prev idx %" PRIu64
", will not apply it",
snap_conf->get_log_idx(),
snap_conf->get_prev_log_idx(),
c_conf->get_log_idx(),
c_conf->get_prev_log_idx());
}
}

if (!snapshot_meta_conf && is_valid_snapshot_config(c_conf, snapshot_last_log_idx)) {
snapshot_meta_conf = c_conf;
}
if (!snapshot_meta_conf) {
p_wn("no config valid for snapshot idx %" PRIu64
", storing non-authoritative empty snapshot config",
snapshot_last_log_idx);
snapshot_meta_conf = cs_new<cluster_config>();
}

precommit_index_ = req.get_snapshot().get_last_log_idx();
Expand All @@ -620,7 +649,7 @@ bool raft_server::handle_snapshot_sync_req(snapshot_sync_req& req, std::unique_l
ptr<snapshot> new_snp = cs_new<snapshot>
( req.get_snapshot().get_last_log_idx(),
req.get_snapshot().get_last_log_term(),
c_conf,
snapshot_meta_conf,
req.get_snapshot().size(),
req.get_snapshot().get_type() );
set_last_snapshot(new_snp);
Expand Down Expand Up @@ -658,4 +687,3 @@ bool raft_server::handle_snapshot_sync_req(snapshot_sync_req& req, std::unique_l
}

} // namespace nuraft;

76 changes: 76 additions & 0 deletions tests/unit/snapshot_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ static ptr<buffer> make_resp_appendix_ctx(uint8_t order) {
return ctx;
}

static ptr<log_entry> make_conf_log_entry(ptr<cluster_config>& conf,
ulong term) {
ptr<buffer> conf_buf = conf->serialize();
return cs_new<log_entry>(term, conf_buf, log_val_type::conf);
}

static ptr<log_entry> make_u64_app_log_entry(ulong value,
ulong term) {
ptr<buffer> app_buf = buffer::alloc(8);
buffer_serializer bs(app_buf);
bs.put_u64(value);
return cs_new<log_entry>(term, app_buf, log_val_type::app_log);
}

static int append_and_replicate(RaftPkg& leader,
const std::vector<RaftPkg*>& pkgs,
size_t begin,
Expand Down Expand Up @@ -1668,6 +1682,65 @@ int snapshot_rewind_floor_test() {
return 0;
}

int snapshot_stale_config_chain_uses_committed_config_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
RaftPkg s1(f_base, 1, s1_addr);

auto init_cb = [&](RaftPkg* pkg) {
ptr<cluster_config> conf1 = cs_new<cluster_config>();
conf1->get_servers().push_back(pkg->getTestMgr()->get_srv_config());
conf1->set_log_idx(1);

ptr<buffer> conf1_buf = conf1->serialize();
ptr<cluster_config> conf3 = cluster_config::deserialize(*conf1_buf);
conf3->set_log_idx(2);
conf3->set_log_idx(3);

ptr<inmem_log_store> log_store =
pkg->getTestMgr()->get_inmem_log_store();
ptr<log_entry> conf1_le = make_conf_log_entry(conf1, 1);
ptr<log_entry> app_le = make_u64_app_log_entry(42, 1);
ptr<log_entry> conf3_le = make_conf_log_entry(conf3, 1);
log_store->append(conf1_le);
log_store->append(app_le);
log_store->append(conf3_le);

pkg->sMgr->save_config(*conf3);

ptr<buffer> dummy = buffer::alloc(8);
buffer_serializer bs(dummy);
bs.put_u64(0);
pkg->getTestSm()->commit(2, *dummy);
pkg->getTestSm()->commit_config(1, conf1);
};

CHK_Z( launch_servers( {&s1}, nullptr, false, cb_default, init_cb ) );

CHK_EQ( 3, s1.raftServer->get_config()->get_log_idx() );
CHK_EQ( 2, s1.raftServer->get_config()->get_prev_log_idx() );
CHK_EQ( 2, s1.getTestSm()->last_commit_index() );
CHK_EQ( log_val_type::app_log,
s1.getTestMgr()->get_inmem_log_store()->entry_at(2)->get_val_type() );
CHK_EQ( 0, s1.getTestSm()->getNumSnapshotCreations() );

ulong snap_idx = s1.raftServer->create_snapshot();

CHK_EQ( 2, snap_idx );
CHK_EQ( 1, s1.getTestSm()->getNumSnapshotCreations() );
CHK_TRUE( s1.getTestSm()->last_snapshot() );
CHK_EQ( 2, s1.getTestSm()->last_snapshot()->get_last_log_idx() );
CHK_TRUE( s1.getTestSm()->last_snapshot()->get_last_config() );
CHK_EQ( 1, s1.getTestSm()->last_snapshot()->get_last_config()->get_log_idx() );

s1.raftServer->shutdown();
f_base->destroy();

return 0;
}

} // namespace snapshot_test
using namespace snapshot_test;

Expand Down Expand Up @@ -1727,6 +1800,9 @@ int main(int argc, char* argv[]) {
ts.doTest( "snapshot rewind floor test",
snapshot_rewind_floor_test );

ts.doTest( "snapshot stale config chain uses committed config",
snapshot_stale_config_chain_uses_committed_config_test );

#ifdef ENABLE_RAFT_STATS
_msg("raft stats: ENABLED\n");
#else
Expand Down