Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/mria.hrl
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
-ifndef(MRIA_HRL).
Comment thread
ieQu1 marked this conversation as resolved.
-define(MRIA_HRL, true).

-type(cluster() :: atom()).

-type(member_status() :: joining | up | healing | leaving | down).
Expand All @@ -19,3 +22,10 @@
-type(member() :: #member{}).

-define(JOIN_LOCK_ID(REQUESTER), {mria_sync_join, REQUESTER}).

-record(mria_replica_status_update,
{ shard :: mria_rlog:shard()
, status :: mria_rlog_replica:state()
}).

-endif.
28 changes: 28 additions & 0 deletions src/mria.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
, wait_for_tables/1
]).

%% Misc. API:
-export([ subscribe_replica_events/2
, unsubscribe_replica_events/2
]).

-type info_key() :: members | running_nodes | stopped_nodes | partitions | rlog.

-type infos() :: #{members := list(member()),
Expand Down Expand Up @@ -546,6 +551,29 @@ dirty_delete_object(Tab, Record) ->
dirty_delete_object(Record) ->
dirty_delete_object(element(1, Record), Record).

%% @doc Subscribe a process to replica events for a given shard.
%%
%% These events are only emitted on replicant nodes,
%% when the local replica changes the state.
%%
%% Events are delivered as `#mria_replica_status_update' records.
%%
%% WARNING: this is an unstable API.
%% It may be broken without notice.
-spec subscribe_replica_events(mria_rlog:shard(), pid()) -> ok | {error, _}.
subscribe_replica_events(Shard, Pid) ->
case mria_rlog:role() of
replicant ->
mria_rlog_replica:subscribe_status_events(Shard, Pid);
_Role ->
{error, invalid_role}
end.
Comment thread
ieQu1 marked this conversation as resolved.

%% @doc Unsubscribe pid from replica status change events.
-spec unsubscribe_replica_events(mria_rlog:shard(), pid()) -> ok.
unsubscribe_replica_events(Shard, Pid) ->
mria_rlog_replica:unsubscribe_status_events(Shard, Pid).

%%================================================================================
%% Internal functions
%%================================================================================
Expand Down
3 changes: 2 additions & 1 deletion src/mria_rlog.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ do_detect_shard({{Tab, _Key}, _Value, _Operation}) ->

-spec init() -> ok.
init() ->
mnesia_hook:register_hook(post_commit, fun ?MODULE:intercept_trans/2).
mnesia_hook:register_hook(post_commit, fun ?MODULE:intercept_trans/2),
mria_rlog_replica:create_tabs().
Comment thread
ieQu1 marked this conversation as resolved.

cleanup() ->
case mria_config:whoami() of
Expand Down
40 changes: 39 additions & 1 deletion src/mria_rlog_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

%% Internal exports:
-export([do_push_tlog_entry/2, push_tlog_entry/4]).
-export([create_tabs/0, subscribe_status_events/2, unsubscribe_status_events/2]).

-export_type([upstream/0]).
-export_type([upstream/0, state/0]).

-include_lib("mria.hrl").
-include("mria_rlog.hrl").
-include_lib("snabbkaffe/include/trace.hrl").

Expand Down Expand Up @@ -76,6 +78,8 @@
-define(name(SHARD, UPSTREAM), {n, l, {?MODULE, SHARD, UPSTREAM}}).
-define(via(SHARD, UPSTREAM), {via, gproc, ?name(SHARD, UPSTREAM)}).

-define(replica_events_tab, mria_replica_events_tab).

%%================================================================================
%% API funcions
%%================================================================================
Expand Down Expand Up @@ -200,6 +204,20 @@ push_tlog_entry(?TRANSPORT_GEN_RPC, Shard, {Node, Pid}, TLOGEntry) ->
gen_rpc:ordered_cast({Node, Shard}, ?MODULE, do_push_tlog_entry, [Pid, TLOGEntry]),
ok.

create_tabs() ->
ets:new(?replica_events_tab, [named_table, public, ordered_set]),
ok.

-spec subscribe_status_events(mria_rlog:shard(), pid()) -> ok.
subscribe_status_events(Shard, Pid) when is_pid(Pid) ->
ets:insert(?replica_events_tab, {{Shard, Pid}}),
ok.

-spec unsubscribe_status_events(mria_rlog:shard(), pid()) -> ok.
unsubscribe_status_events(Shard, Pid) ->
ets:delete(?replica_events_tab, {Shard, Pid}),
ok.

%%================================================================================
%% Internal functions
%%================================================================================
Expand Down Expand Up @@ -509,6 +527,7 @@ handle_state_trans(OldState, State, #d{shard = Shard, is_merge_shard = IsMerge,
false ->
mria_status:notify_replicant_state(Shard, State)
end,
broadcast_status_event(Shard, State),
keep_state_and_data.

-spec do_push_tlog_entry(pid(), mria_rlog:entry()) -> ok.
Expand Down Expand Up @@ -558,3 +577,22 @@ perform_autoclean(Shard, Upstream) ->
|| Table <- mria_schema:tables_of_shard(Shard),
mria_schema:get_merged_table_auto_clean(Table)],
ok.

-spec broadcast_status_event(mria_rlog:shard(), state()) -> ok.
broadcast_status_event(Shard, Status) ->
try
MS = {{{Shard, '$1'}}, [], ['$1']},
Pids = ets:select(?replica_events_tab, [MS]),
Event = #mria_replica_status_update{ shard = Shard
, status = Status
},
[I ! Event || I <- Pids],
ok
Comment thread
ieQu1 marked this conversation as resolved.
catch
EC:Err:Stack ->
?tp(warning, mria_failed_to_broadcast_replica_event,
#{ EC => Err
, stacktrace => Stack
}),
ok
end.
61 changes: 61 additions & 0 deletions test/mria_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,67 @@ t_is_peer_alive(_) ->
end,
[]).

t_replica_state_events(_) ->
Shard = Table = ?FUNCTION_NAME,
Cluster = [S1, _S2] = mria_ct:cluster([core, replicant], mria_mnesia_test_util:common_env()),
?check_trace(
#{timetrap => 30000},
try
%% Prepare:
Self = self(),
[C1, R1] = mria_ct:start_cluster(mria, Cluster),
mria_mnesia_test_util:stabilize(1000),
%% Verify that core cannot subscribe to replica events:
?assertEqual(
{error, invalid_role},
?ON(C1, mria:subscribe_replica_events(Shard, Self))),
%% Subscribe to events for a new shard:
?assertEqual(
ok,
?ON(R1, mria:subscribe_replica_events(Shard, Self))),
%% Create a table in new shard:
[?ON(I,
begin
ok = mria:create_table(Table, [{rlog_shard, Shard}]),
ok = mria:wait_for_tables([Table])
end)
|| I <- [C1, R1]],
%% This should create series of events:
ct:sleep(100),
?assertEqual(
[ #mria_replica_status_update{shard = t_replica_state_events, status = disconnected}
, #mria_replica_status_update{shard = t_replica_state_events, status = bootstrap}
, #mria_replica_status_update{shard = t_replica_state_events, status = local_replay}
, #mria_replica_status_update{shard = t_replica_state_events, status = normal}
],
mria_ct:mailbox()),
%% Shut down core node, replicant should react:
slave:stop(C1),
ct:sleep(1000),
?assertEqual(
[ #mria_replica_status_update{shard = t_replica_state_events, status = disconnected}
],
mria_ct:mailbox()),
%% Restart core:
C1 = mria_ct:start_slave(mria, S1),
ok = ?ON(C1, mria:create_table(Table, [{rlog_shard, Shard}])),
ok = ?ON(C1, mria:wait_for_tables([Table])),
ct:sleep(5_000),
?assertEqual(
[ #mria_replica_status_update{shard = t_replica_state_events, status = bootstrap}
, #mria_replica_status_update{shard = t_replica_state_events, status = local_replay}
, #mria_replica_status_update{shard = t_replica_state_events, status = normal}
],
mria_ct:mailbox()),
?assertEqual(
ok,
?ON(R1, mria:unsubscribe_replica_events(Shard, Self))),
ok
after
ok = mria_ct:teardown_cluster(Cluster)
end,
[]).

get_preferred_core_node(Shard, Replicant) ->
?ON(Replicant,
begin
Expand Down