Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public:
void send_req(ptr<peer> myself,
ptr<req_msg>& req,
rpc_handler& handler,
bool streaming = false);
bool streaming = false,
uint64_t send_timeout_ms = 0);

void shutdown();

Expand Down
2 changes: 1 addition & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ protected:
bool check_cond_for_zp_election();
void request_prevote();
void initiate_vote(bool force_vote = false);
void request_vote(bool force_vote);
void request_vote(bool force_vote, uint64_t election_rpc_timeout_ms);
void request_append_entries();
bool request_append_entries(ptr<peer> p);
bool send_request(ptr<peer>& p,
Expand Down
705 changes: 453 additions & 252 deletions src/asio_service.cxx

Large diffs are not rendered by default.

58 changes: 52 additions & 6 deletions src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ limitations under the License.

namespace nuraft {

namespace {

uint64_t get_election_rpc_timeout_ms(const raft_params& params, int32 response_limit) {
if (params.election_timeout_upper_bound_ > 0) {
return static_cast<uint64_t>(params.election_timeout_upper_bound_);
}
if (params.election_timeout_lower_bound_ > 0) {
return static_cast<uint64_t>(params.election_timeout_lower_bound_);
}
if (params.heart_beat_interval_ > 0 && response_limit > 0) {
return static_cast<uint64_t>(params.heart_beat_interval_) *
static_cast<uint64_t>(response_limit);
}
return 0;
}

} // namespace

bool raft_server::check_cond_for_zp_election() {
ptr<raft_params> params = ctx_->get_params();
if ( params->allow_temporary_zero_priority_leader_ &&
Expand All @@ -47,6 +65,20 @@ bool raft_server::check_cond_for_zp_election() {

void raft_server::request_prevote() {
ptr<raft_params> params = ctx_->get_params();
const int32 response_limit = raft_server::raft_limits_.response_limit_.load();
const uint64_t election_rpc_timeout_ms =
get_election_rpc_timeout_ms(*params, response_limit);
if (!election_rpc_timeout_ms) {
p_er("cannot send election RPC with disabled timeout: "
"election_timeout_upper_bound %d, election_timeout_lower_bound %d, "
"heart_beat_interval %d, response_limit %d",
params->election_timeout_upper_bound_,
params->election_timeout_lower_bound_,
params->heart_beat_interval_,
response_limit);
return;
}

ptr<cluster_config> c_config = get_config();
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
Expand Down Expand Up @@ -165,7 +197,7 @@ void raft_server::request_prevote() {
log_store_->next_slot() - 1,
quick_commit_index_.load() ) );
if (pp->make_busy()) {
pp->send_req(pp, req, resp_handler_);
pp->send_req(pp, req, resp_handler_, false, election_rpc_timeout_ms);
} else {
pre_vote_.connection_busy_++;
p_wn("failed to send prevote request: peer %d (%s) is busy, count %d",
Expand Down Expand Up @@ -197,7 +229,8 @@ void raft_server::request_prevote() {
}

void raft_server::initiate_vote(bool force_vote) {
int grace_period = ctx_->get_params()->grace_period_of_lagging_state_machine_;
ptr<raft_params> params = ctx_->get_params();
int grace_period = params->grace_period_of_lagging_state_machine_;
ulong cur_term = state_->get_term();
if ( !force_vote &&
grace_period &&
Expand Down Expand Up @@ -235,6 +268,20 @@ void raft_server::initiate_vote(bool force_vote) {
check_cond_for_zp_election() ||
( get_quorum_for_election() == 0 &&
my_priority_ > 0 ) ) {
const int32 response_limit = raft_server::raft_limits_.response_limit_.load();
const uint64_t election_rpc_timeout_ms =
get_election_rpc_timeout_ms(*params, response_limit);
if (!election_rpc_timeout_ms) {
p_er("cannot send election RPC with disabled timeout: "
"election_timeout_upper_bound %d, election_timeout_lower_bound %d, "
"heart_beat_interval %d, response_limit %d",
params->election_timeout_upper_bound_,
params->election_timeout_lower_bound_,
params->heart_beat_interval_,
response_limit);
return;
}

// Request vote when
// 1) my priority satisfies the target, OR
// 2) I'm the only node in the group.
Expand All @@ -247,7 +294,7 @@ void raft_server::initiate_vote(bool force_vote) {
election_completed_ = false;
// NOTE: Following `request_vote` will call `save_state()`,
// hence we don't call it here even though `state_` changes.
request_vote(force_vote);
request_vote(force_vote, election_rpc_timeout_ms);
}

if (role_ != srv_role::leader) {
Expand All @@ -256,7 +303,7 @@ void raft_server::initiate_vote(bool force_vote) {
}
}

void raft_server::request_vote(bool force_vote) {
void raft_server::request_vote(bool force_vote, uint64_t election_rpc_timeout_ms) {
state_->set_voted_for(id_);
ctx_->state_mgr_->save_state(*state_);
votes_granted_ += 1;
Expand Down Expand Up @@ -305,7 +352,7 @@ void raft_server::request_vote(bool force_vote) {
it->second->get_id(),
state_->get_term() );
if (pp->make_busy()) {
pp->send_req(pp, req, resp_handler_);
pp->send_req(pp, req, resp_handler_, false, election_rpc_timeout_ms);
} else {
p_wn("failed to send vote request: peer %d (%s) is busy",
pp->get_id(), pp->get_endpoint().c_str());
Expand Down Expand Up @@ -552,4 +599,3 @@ void raft_server::handle_prevote_resp(resp_msg& resp) {
}

} // namespace nuraft;

100 changes: 79 additions & 21 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@ limitations under the License.

#include "debugging_options.hxx"
#include "raft_server.hxx"
#include "strfmt.hxx"
#include "tracer.hxx"

#include <atomic>
#include <memory>
#include <unordered_set>

namespace nuraft {

void peer::send_req( ptr<peer> myself,
ptr<req_msg>& req,
rpc_handler& handler,
bool streaming )
bool streaming,
uint64_t send_timeout_ms )
{
if (abandoned_) {
p_er("peer %d has been shut down, cannot send request",
get_config().get_id());
set_free();
return;
}

Expand Down Expand Up @@ -66,20 +71,68 @@ void peer::send_req( ptr<peer> myself,
}
}

rpc_handler h = (rpc_handler)std::bind
( &peer::handle_rpc_result,
this,
myself,
rpc_local->get_id(),
req,
pending,
streaming,
req_size_bytes,
std::placeholders::_1,
std::placeholders::_2 );
if (rpc_local) {
myself->bytes_in_flight_add(req_size_bytes);
rpc_local->send(req, h);
uint64_t rpc_client_id = rpc_local->get_id();
auto rpc_callback_invoked = std::make_shared<std::atomic<bool>>(false);
rpc_handler h =
[this,
myself,
rpc_client_id,
req,
pending,
streaming,
req_size_bytes,
rpc_callback_invoked]
(ptr<resp_msg>& resp, ptr<rpc_exception>& err) mutable
{
rpc_callback_invoked->store(true, std::memory_order_relaxed);
handle_rpc_result(myself,
rpc_client_id,
req,
pending,
streaming,
req_size_bytes,
resp,
err);
};
myself->bytes_in_flight_add(req_size_bytes);
try {
rpc_local->send(req, h, send_timeout_ms);
} catch (const std::exception& e) {
if (rpc_callback_invoked->load(std::memory_order_relaxed)) {
throw;
}

std::string err_msg =
lstrfmt("synchronous exception from rpc_client::send to peer %d: %s")
.fmt(req->get_dst(), e.what());
ptr<resp_msg> no_resp;
ptr<rpc_exception> err = cs_new<rpc_exception>(err_msg, req);
handle_rpc_result(myself,
rpc_client_id,
req,
pending,
streaming,
req_size_bytes,
no_resp,
err);
} catch (...) {
if (rpc_callback_invoked->load(std::memory_order_relaxed)) {
throw;
}

std::string err_msg =
lstrfmt("synchronous unknown exception from rpc_client::send to peer %d")
.fmt(req->get_dst());
ptr<resp_msg> no_resp;
ptr<rpc_exception> err = cs_new<rpc_exception>(err_msg, req);
handle_rpc_result(myself,
rpc_client_id,
req,
pending,
streaming,
req_size_bytes,
no_resp,
err);
}
}

Expand Down Expand Up @@ -152,13 +205,16 @@ void peer::handle_rpc_result( ptr<peer> myself,
auto_lock(lock_);
resume_hb_speed();
}
ptr<rpc_exception> no_except;
resp->set_peer(myself);
pending_result->set_result(resp, no_except);

reconn_backoff_.reset();
reconn_backoff_.set_duration_ms(1);

ptr<rpc_exception> no_except;
resp->set_peer(myself);
// `set_result` invokes user handlers synchronously, so all peer
// lifecycle cleanup must stay before this call.
pending_result->set_result(resp, no_except);

} else {
// Failed.

Expand All @@ -169,8 +225,6 @@ void peer::handle_rpc_result( ptr<peer> myself,
auto_lock(lock_);
slow_down_hb();
}
ptr<resp_msg> no_resp;
pending_result->set_result(no_resp, err);

// Destroy this connection, we MUST NOT re-use existing socket.
// Next append operation will create a new one.
Expand Down Expand Up @@ -217,6 +271,11 @@ void peer::handle_rpc_result( ptr<peer> myself,
}
}
}

ptr<resp_msg> no_resp;
// `set_result` invokes user handlers synchronously, so all peer
// lifecycle cleanup must stay before this call.
pending_result->set_result(no_resp, err);
}
}

Expand Down Expand Up @@ -327,4 +386,3 @@ void peer::reopen(context& ctx, timer_task<int32>::executor& hb_exec) {
}

} // namespace nuraft;

Loading
Loading