Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/mria.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
, leave/0
, force_leave/1

, is_peer_alive/1
, running_nodes/0
, cluster_nodes/1
, cluster_status/1
Expand Down Expand Up @@ -212,6 +213,22 @@ cluster_status(Node) ->
is_node_in_cluster(Node) ->
lists:member(Node, cluster_nodes(all)).

%% @doc Check (synchronously) if any of the alive peers considers a peer node alive.
%%
%% WARNING:
%%
%% 1. This is a heavy operation involving RPC
%% 2. It may report false positives when called immediately after `nodedown' event:
%% other nodes may react to `nodedown' with a delay.
%%
%% Because of 2, it may be necessary to retry this operation.
-spec is_peer_alive(node()) -> {ok, boolean()} | {aborted, _}.
is_peer_alive(Node) ->
maybe
{atomic, Result} = transaction(?mria_meta_shard, fun is_peer_alive_trans/1, [Node]),
{ok, Result}
end.

%% @doc Running nodes.
%% This function should be used with care, as it may not return the most up-to-date
%% view of replicant nodes, as changes in mria_membership are reflected asynchronously.
Expand Down Expand Up @@ -812,3 +829,26 @@ rem_time(Timeout, T0, T1) ->
max(0, Timeout - (T1 - T0));
true -> infinity
end.

-spec is_peer_alive_trans(node()) -> boolean().
is_peer_alive_trans(Node) ->
PeerCores = mria_mnesia:running_nodes(),
%% Is it a core?
case lists:member(Node, PeerCores) of
true ->
true;
false ->
%% Could be a replicant?
Responses = erpc:multicall(PeerCores, mria_membership, running_replicant_nodelist, [], 5_000),
lists:any(
fun(Resp) ->
case Resp of
{ok, Replicants} ->
lists:member(Node, Replicants);
_ ->
%% RPC error, default to `true'.
true
end
end,
Responses)
end.
49 changes: 48 additions & 1 deletion test/mria_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ t_merge_table_bootstrap(_) ->
, ?snk_span := {complete, _}
})
|| I <- Nodes, J <- Nodes, I =/= J],
ct:sleep(1000),
ct:sleep(5000),
%% Verify that data on all nodes is consistent:
[?defer_assert(
?assertEqual(
Expand Down Expand Up @@ -2264,6 +2264,53 @@ t_merge_table_autoclean(_) ->
end,
[]).

t_is_peer_alive(_) ->
Comment thread
ieQu1 marked this conversation as resolved.
Cluster = mria_ct:cluster([core, core, replicant, replicant], mria_mnesia_test_util:common_env()),
?check_trace(
#{timetrap => 30000},
try
Nodes = mria_ct:start_cluster(mria, Cluster),
mria_mnesia_test_util:stabilize(1000),
%% All peers should be alive initially
[?assertEqual(
{ok, true},
rpc:call(I, mria, is_peer_alive, [J]),
#{on => J, target => I})
|| I <- Nodes,
J <- Nodes],
%% Non-existent node should be reported as not alive
[?assertEqual(
{ok, false},
rpc:call(I, mria, is_peer_alive, ['nonexistent@127.0.0.1']),
#{on => I})
|| I <- Nodes],
%% Restart nodes one by one and verify that peers report state correctly
[begin
ok = ?tp_span(notice, test_stopping_node, #{node => I},
slave:stop(I)),
ct:sleep(1000),
[?assertEqual(
{ok, false},
rpc:call(J, mria, is_peer_alive, [I]),
#{on => J, target => I})
|| J <- Nodes,
I =/= J],
%% Restart:
I = ?tp_span(notice, test_restarting_node, #{node => I},
mria_ct:start_slave(mria, Spec)),
ct:sleep(1000),
%% Verify that the node is reported as up again:
[?assertEqual(
{ok, true},
rpc:call(J, mria, is_peer_alive, [I]),
#{on => J, target => I})
|| J <- Nodes]
end
|| {I, Spec} <- lists:zip(Nodes, Cluster)]
after
ok = mria_ct:teardown_cluster(Cluster)
end,
[]).

get_preferred_core_node(Shard, Replicant) ->
?ON(Replicant,
Expand Down