Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/mria.hrl
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-ifndef(MRIA_HRL).
Comment thread
ieQu1 marked this conversation as resolved.

-type(cluster() :: atom()).

-type(member_status() :: joining | up | healing | leaving | down).
Expand All @@ -19,3 +21,10 @@
-type(member() :: #member{}).

-define(JOIN_LOCK_ID(REQUESTER), {mria_sync_join, REQUESTER}).

-record(mria_replica_status_update,
{ shard :: mria_rlog:shard()
, status :: mria_rlog_replica:state()
}).

-endif.
28 changes: 28 additions & 0 deletions src/mria.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
, wait_for_tables/1
]).

%% Misc. API:
-export([ subscribe_replica_events/2
, unsubscribe_replica_events/2
]).

-type info_key() :: members | running_nodes | stopped_nodes | partitions | rlog.

-type infos() :: #{members := list(member()),
Expand Down Expand Up @@ -546,6 +551,29 @@ dirty_delete_object(Tab, Record) ->
dirty_delete_object(Record) ->
dirty_delete_object(element(1, Record), Record).

%% @doc Subscribe a process to replica events for a given shard.
%%
%% These events are only emitted on replicant nodes,
%% when the local replica changes the state.
%%
%% Events are delivered as `#mria_replica_status_update' records.
%%
%% WARNING: this is an unstable API.
%% It may be broken without notice.
-spec subscribe_replica_events(mria_rlog:shard(), pid()) -> ok | {error, _}.
subscribe_replica_events(Shard, Pid) ->
case mria_rlog:role() of
replicant ->
mria_rlog_replica:subscribe_status_events(Shard, Pid);
_Role ->
{error, invalid_role}
end.
Comment thread
ieQu1 marked this conversation as resolved.

%% @doc Unsubscribe pid from replica status change events.
-spec unsubscribe_replica_events(mria_rlog:shard(), pid()) -> ok.
unsubscribe_replica_events(Shard, Pid) ->
mria_rlog_replica:unsubscribe_status_events(Shard, Pid).

%%================================================================================
%% Internal functions
%%================================================================================
Expand Down
3 changes: 2 additions & 1 deletion src/mria_rlog.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ do_detect_shard({{Tab, _Key}, _Value, _Operation}) ->

-spec init() -> ok.
init() ->
mnesia_hook:register_hook(post_commit, fun ?MODULE:intercept_trans/2).
mnesia_hook:register_hook(post_commit, fun ?MODULE:intercept_trans/2),
mria_rlog_replica:create_tabs().
Comment thread
ieQu1 marked this conversation as resolved.

cleanup() ->
case mria_config:whoami() of
Expand Down
39 changes: 38 additions & 1 deletion src/mria_rlog_replica.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

%% Internal exports:
-export([do_push_tlog_entry/2, push_tlog_entry/4]).
-export([create_tabs/0, subscribe_status_events/2, unsubscribe_status_events/2]).

-export_type([upstream/0]).
-export_type([upstream/0, state/0]).

-include_lib("mria.hrl").
-include("mria_rlog.hrl").
-include_lib("snabbkaffe/include/trace.hrl").

Expand Down Expand Up @@ -76,6 +78,8 @@
-define(name(SHARD, UPSTREAM), {n, l, {?MODULE, SHARD, UPSTREAM}}).
-define(via(SHARD, UPSTREAM), {via, gproc, ?name(SHARD, UPSTREAM)}).

-define(replica_events_tab, mria_replica_events_tab).

%%================================================================================
%% API funcions
%%================================================================================
Expand Down Expand Up @@ -200,6 +204,19 @@ push_tlog_entry(?TRANSPORT_GEN_RPC, Shard, {Node, Pid}, TLOGEntry) ->
gen_rpc:ordered_cast({Node, Shard}, ?MODULE, do_push_tlog_entry, [Pid, TLOGEntry]),
ok.

create_tabs() ->
ets:new(?replica_events_tab, [named_table, public, ordered_set]).

-spec subscribe_status_events(mria_rlog:shard(), pid()) -> ok.
subscribe_status_events(Shard, Pid) ->
ets:insert(?replica_events_tab, {{Shard, Pid}}),
ok.

-spec unsubscribe_status_events(mria_rlog:shard(), pid()) -> ok.
unsubscribe_status_events(Shard, Pid) ->
ets:delete(?replica_events_tab, {Shard, Pid}),
ok.

%%================================================================================
%% Internal functions
%%================================================================================
Expand Down Expand Up @@ -509,6 +526,7 @@ handle_state_trans(OldState, State, #d{shard = Shard, is_merge_shard = IsMerge,
false ->
mria_status:notify_replicant_state(Shard, State)
end,
broadcast_status_event(Shard, State),
keep_state_and_data.

-spec do_push_tlog_entry(pid(), mria_rlog:entry()) -> ok.
Expand Down Expand Up @@ -558,3 +576,22 @@ perform_autoclean(Shard, Upstream) ->
|| Table <- mria_schema:tables_of_shard(Shard),
mria_schema:get_merged_table_auto_clean(Table)],
ok.

-spec broadcast_status_event(mria_rlog:shard(), state()) -> ok.
broadcast_status_event(Shard, Status) ->
try
MS = {{{Shard, '$1'}}, [], ['$1']},
Pids = ets:select(?replica_events_tab, [MS]),
Event = #mria_replica_status_update{ shard = Shard
, status = Status
},
[I ! Event || I <- Pids],
ok
Comment thread
ieQu1 marked this conversation as resolved.
catch
EC:Err:Stack ->
?tp(warning, mria_failed_to_broadcast_replica_event,
#{ EC => Err
, stacktrace => Stack
}),
ok
end.
61 changes: 61 additions & 0 deletions test/mria_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2312,6 +2312,67 @@ t_is_peer_alive(_) ->
end,
[]).

t_replica_state_events(_) ->
Shard = Table = ?FUNCTION_NAME,
Cluster = [S1, _S2] = mria_ct:cluster([core, replicant], mria_mnesia_test_util:common_env()),
?check_trace(
#{timetrap => 30000},
try
%% Prepare:
Self = self(),
[C1, R1] = mria_ct:start_cluster(mria, Cluster),
mria_mnesia_test_util:stabilize(1000),
%% Verify that core cannot subscribe to replica events:
?assertEqual(
{error, invalid_role},
?ON(C1, mria:subscribe_replica_events(Shard, Self))),
%% Subscribe to events for a new shard:
?assertEqual(
ok,
?ON(R1, mria:subscribe_replica_events(Shard, Self))),
%% Create a table in new shard:
[?ON(I,
begin
ok = mria:create_table(Table, [{rlog_shard, Shard}]),
ok = mria:wait_for_tables([Table])
end)
|| I <- [C1, R1]],
%% This should created series of events:
Comment thread
ieQu1 marked this conversation as resolved.
Outdated
ct:sleep(100),
?assertEqual(
[ #mria_replica_status_update{shard = t_replica_state_events, status = disconnected}
, #mria_replica_status_update{shard = t_replica_state_events, status = bootstrap}
, #mria_replica_status_update{shard = t_replica_state_events, status = local_replay}
, #mria_replica_status_update{shard = t_replica_state_events, status = normal}
],
mria_ct:mailbox()),
%% Shut down core node, replicant should react:
slave:stop(C1),
ct:sleep(1000),
?assertEqual(
[ #mria_replica_status_update{shard = t_replica_state_events, status = disconnected}
],
mria_ct:mailbox()),
%% Restart core:
C1 = mria_ct:start_slave(mria, S1),
ok = ?ON(C1, mria:create_table(Table, [{rlog_shard, Shard}])),
ok = ?ON(C1, mria:wait_for_tables([Table])),
ct:sleep(5_000),
?assertEqual(
[ #mria_replica_status_update{shard = t_replica_state_events, status = bootstrap}
, #mria_replica_status_update{shard = t_replica_state_events, status = local_replay}
, #mria_replica_status_update{shard = t_replica_state_events, status = normal}
],
mria_ct:mailbox()),
?assertEqual(
ok,
?ON(R1, mria:unsubscribe_replica_events(Shard, Self))),
ok
after
ok = mria_ct:teardown_cluster(Cluster)
end,
[]).

get_preferred_core_node(Shard, Replicant) ->
?ON(Replicant,
begin
Expand Down