Skip to content

Commit 4c24123

Browse files
robacourtmsfstef
andauthored
Removing file writes from the main WAL processing loop (#2470)
Speed up replication processing by removing file writes from the main processing loop: - persisting the tracked relations now only happens on a relation change, not every transaction - `last_processed_lsn` is no longer persisted, but it is worked out from the last lsn seen in the shape logs. This also avoids a race condition where the lsn is written in the shape logs but not the PersistedReplicationState. As part of this PR, this also fixes a bug we found where `last_processed_lsn` was always being set to `Lsn.from_integer(0)` on start-up rather than reading from disk. Please note that as part of this PR, `last_processed_lsn` and `global_last_seen_lsn` no longer include LSNs from transactions that do not affect any shapes. Here we can see a 20% increase in WAL processing speed: <img width="811" alt="Screenshot 2025-03-24 at 15 11 29" src="https://github.com/user-attachments/assets/ee299705-504d-41a7-a244-57d44bad5e39" /> --------- Co-authored-by: msfstef <msfstef@gmail.com>
1 parent 931ec10 commit 4c24123

File tree

16 files changed

+282
-101
lines changed

16 files changed

+282
-101
lines changed

.changeset/honest-crabs-sing.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Speed up replication processing by removing file writes from the main processing loop

packages/sync-service/lib/electric/connection/manager.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,8 @@ defmodule Electric.Connection.Manager do
332332
stack_id: state.stack_id,
333333
persistent_kv: state.persistent_kv
334334
)
335+
336+
Electric.LsnTracker.reset(state.stack_id)
335337
end
336338

337339
{:ok, shapes_sup_pid} =
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
defmodule Electric.LsnTracker do
2+
alias Electric.Postgres.Lsn
3+
4+
def init(last_processed_lsn, stack_id) do
5+
create_table(stack_id)
6+
7+
set_last_processed_lsn(last_processed_lsn, stack_id)
8+
end
9+
10+
@spec set_last_processed_lsn(Lsn.t() | non_neg_integer(), String.t()) :: :ok
11+
def set_last_processed_lsn(lsn, stack_id) when is_struct(lsn, Lsn) do
12+
stack_id
13+
|> table()
14+
|> :ets.insert({:last_processed_lsn, lsn})
15+
end
16+
17+
def set_last_processed_lsn(lsn, stack_id) when is_integer(lsn) do
18+
set_last_processed_lsn(Lsn.from_integer(lsn), stack_id)
19+
end
20+
21+
@spec get_last_processed_lsn(String.t()) :: Lsn.t()
22+
def get_last_processed_lsn(stack_id) do
23+
[last_processed_lsn: lsn] =
24+
stack_id
25+
|> table()
26+
|> :ets.lookup(:last_processed_lsn)
27+
28+
lsn
29+
end
30+
31+
def reset(stack_id) do
32+
set_last_processed_lsn(Lsn.from_integer(0), stack_id)
33+
end
34+
35+
defp create_table(stack_id) do
36+
stack_id
37+
|> table()
38+
|> :ets.new([:protected, :named_table])
39+
end
40+
41+
defp table(stack_id) do
42+
:"#{stack_id}:lsn_tracker"
43+
end
44+
end

packages/sync-service/lib/electric/postgres/lsn.ex

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,26 @@ defmodule Electric.Postgres.Lsn do
229229
|> from_integer()
230230
end
231231

232+
@doc """
233+
Returns the highest Lsn from the given list of Lsns.
234+
235+
When the list is empty, it returns #Lsn<0/0>.
236+
237+
## Examples
238+
iex> max([%#{Lsn}{segment: 0, offset: 1}, %#{Lsn}{segment: 0, offset: 2}])
239+
%#{Lsn}{segment: 0, offset: 2}
240+
241+
iex> max([%#{Lsn}{segment: 1, offset: 1}, %#{Lsn}{segment: 0, offset: 2}])
242+
%#{Lsn}{segment: 1, offset: 1}
243+
244+
iex> max([])
245+
%#{Lsn}{segment: 0, offset: 0}
246+
247+
"""
248+
@spec max(Enumerable.t(t())) :: t()
249+
def max([]), do: from_integer(0)
250+
def max(lsns) when is_list(lsns), do: Enum.max_by(lsns, &to_integer/1)
251+
232252
defimpl Inspect do
233253
def inspect(lsn, _opts) do
234254
"#Lsn<#{Electric.Postgres.Lsn.to_iolist(lsn)}>"

packages/sync-service/lib/electric/replication/log_offset.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,27 @@ defmodule Electric.Replication.LogOffset do
6262

6363
def new(%__MODULE__{} = offset), do: offset
6464

65+
@doc """
66+
Returns the LSN part of the LogOffset.
67+
68+
## Examples
69+
70+
iex> extract_lsn(%LogOffset{tx_offset: 10, op_offset: 0})
71+
#Lsn<0/A>
72+
73+
iex> extract_lsn(%LogOffset{tx_offset: 10, op_offset: 5})
74+
#Lsn<0/A>
75+
76+
iex> extract_lsn(%LogOffset{tx_offset: 11, op_offset: 5})
77+
#Lsn<0/B>
78+
79+
iex> extract_lsn(LogOffset.before_all())
80+
#Lsn<0/0>
81+
"""
82+
@spec extract_lsn(t()) :: Lsn.t()
83+
def extract_lsn(%LogOffset{tx_offset: offset}) when offset < 0, do: Lsn.from_integer(0)
84+
def extract_lsn(%LogOffset{tx_offset: offset}), do: Lsn.from_integer(offset)
85+
6586
@doc """
6687
Compare two log offsets
6788

packages/sync-service/lib/electric/replication/persistent_replication_state.ex

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
defmodule Electric.Replication.PersistentReplicationState do
22
alias Electric.PersistentKV
3-
alias Electric.Postgres.Lsn
43
alias Electric.Replication.Changes
54
require Logger
65

@@ -9,27 +8,6 @@ defmodule Electric.Replication.PersistentReplicationState do
98
persistent_kv: Electric.PersistentKV.t()
109
]
1110

12-
@last_processed_lsn_key "last_processed_lsn"
13-
14-
@spec set_last_processed_lsn(Lsn.t() | non_neg_integer(), opts()) :: :ok
15-
def set_last_processed_lsn(lsn, opts) when is_struct(lsn, Lsn) do
16-
lsn |> Lsn.to_integer() |> set_last_processed_lsn(opts)
17-
end
18-
19-
def set_last_processed_lsn(lsn, opts) when is_integer(lsn) do
20-
Logger.debug("Updating last processed lsn to #{lsn}")
21-
set(@last_processed_lsn_key, lsn, opts)
22-
end
23-
24-
@spec get_last_processed_lsn(opts()) :: Lsn.t()
25-
def get_last_processed_lsn(opts) do
26-
case get(@last_processed_lsn_key, opts) do
27-
{:ok, last_processed_lsn} -> last_processed_lsn
28-
{:error, :not_found} -> 0
29-
end
30-
|> Lsn.from_integer()
31-
end
32-
3311
@base_tracked_relations %{
3412
table_to_id: %{},
3513
id_to_table_info: %{}
@@ -55,7 +33,6 @@ defmodule Electric.Replication.PersistentReplicationState do
5533

5634
@spec reset(opts()) :: :ok
5735
def reset(opts) do
58-
set(@last_processed_lsn_key, 0, opts)
5936
set_tracked_relations(@base_tracked_relations, opts)
6037
end
6138

packages/sync-service/lib/electric/replication/shape_log_collector.ex

Lines changed: 35 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule Electric.Replication.ShapeLogCollector do
66
use GenStage
77

88
require Electric.Postgres.Lsn
9+
alias Electric.LsnTracker
910
alias Electric.Replication.ShapeLogCollector.AffectedColumns
1011
alias Electric.Postgres.Lsn
1112
alias Electric.Replication.PersistentReplicationState
@@ -19,9 +20,7 @@ defmodule Electric.Replication.ShapeLogCollector do
1920
@schema NimbleOptions.new!(
2021
stack_id: [type: :string, required: true],
2122
inspector: [type: :mod_arg, required: true],
22-
persistent_kv: [type: :any, required: true],
23-
# see https://hexdocs.pm/gen_stage/GenStage.html#c:init/1-options
24-
demand: [type: {:in, [:forward, :accumulate]}, default: :accumulate]
23+
persistent_kv: [type: :any, required: true]
2524
)
2625

2726
def start_link(opts) do
@@ -34,6 +33,10 @@ defmodule Electric.Replication.ShapeLogCollector do
3433
Electric.ProcessRegistry.name(stack_id, __MODULE__)
3534
end
3635

36+
def start_processing(server, last_processed_lsn) do
37+
GenStage.call(server, {:start_processing, last_processed_lsn})
38+
end
39+
3740
# use `GenStage.call/2` here to make the event processing synchronous.
3841
#
3942
# Because `Electric.Shapes.Dispatcher` only sends demand to this producer
@@ -78,15 +81,13 @@ defmodule Electric.Replication.ShapeLogCollector do
7881
producer: nil,
7982
subscriptions: {0, MapSet.new()},
8083
persistent_replication_data_opts: persistent_replication_data_opts,
81-
last_seen_lsn:
82-
PersistentReplicationState.get_last_processed_lsn(persistent_replication_data_opts),
8384
tracked_relations: tracker_state
8485
})
8586

8687
# start in demand: :accumulate mode so that the ShapeCache is able to start
8788
# all active consumers before we start sending transactions
8889
{:producer, state,
89-
dispatcher: {Electric.Shapes.Dispatcher, inspector: state.inspector}, demand: opts.demand}
90+
dispatcher: {Electric.Shapes.Dispatcher, inspector: state.inspector}, demand: :accumulate}
9091
end
9192

9293
def handle_subscribe(:consumer, _opts, from, state) do
@@ -109,19 +110,12 @@ defmodule Electric.Replication.ShapeLogCollector do
109110
# last transaction and we can reply to the call and unblock the replication
110111
# client.
111112
def handle_demand(_demand, %{producer: producer} = state) do
112-
GenServer.reply(producer, :ok)
113-
114-
:ok =
115-
PersistentReplicationState.set_last_processed_lsn(
116-
state.last_seen_lsn,
117-
state.persistent_replication_data_opts
118-
)
113+
LsnTracker.set_last_processed_lsn(
114+
state.last_processed_lsn,
115+
state.stack_id
116+
)
119117

120-
:ok =
121-
PersistentReplicationState.set_tracked_relations(
122-
state.tracked_relations,
123-
state.persistent_replication_data_opts
124-
)
118+
GenServer.reply(producer, :ok)
125119

126120
{:noreply, [], %{state | producer: nil}}
127121
end
@@ -134,6 +128,12 @@ defmodule Electric.Replication.ShapeLogCollector do
134128
{:noreply, [], remove_subscription(from, state)}
135129
end
136130

131+
def handle_call({:start_processing, lsn}, _from, state) do
132+
LsnTracker.init(lsn, state.stack_id)
133+
GenStage.demand(self(), :forward)
134+
{:reply, :ok, [], Map.put(state, :last_processed_lsn, lsn)}
135+
end
136+
137137
def handle_call({:new_txn, %Transaction{xid: xid, lsn: lsn} = txn, trace_context}, from, state) do
138138
OpenTelemetry.set_current_context(trace_context)
139139

@@ -161,17 +161,17 @@ defmodule Electric.Replication.ShapeLogCollector do
161161
# will prompt the `GenServer.reply/2` call.
162162
defp handle_transaction(txn, _from, %{subscriptions: {0, _}} = state) do
163163
Logger.debug(fn -> "Dropping transaction #{txn.xid}: no active consumers" end)
164-
drop_transaction(txn, state)
164+
drop_transaction(state)
165165
end
166166

167167
# If we've already processed a transaction, then drop it without processing
168-
defp handle_transaction(txn, _from, %{last_seen_lsn: last_seen_lsn} = state)
169-
when not Lsn.is_larger(txn.lsn, last_seen_lsn) do
168+
defp handle_transaction(txn, _from, %{last_processed_lsn: last_processed_lsn} = state)
169+
when not Lsn.is_larger(txn.lsn, last_processed_lsn) do
170170
Logger.debug(fn ->
171-
"Dropping transaction #{txn.xid}: transaction LSN #{txn.lsn} smaller than last processed #{last_seen_lsn}"
171+
"Dropping transaction #{txn.xid}: transaction LSN #{txn.lsn} smaller than last processed #{last_processed_lsn}"
172172
end)
173173

174-
drop_transaction(txn, state)
174+
drop_transaction(state)
175175
end
176176

177177
defp handle_transaction(txn, from, state) do
@@ -191,7 +191,7 @@ defmodule Electric.Replication.ShapeLogCollector do
191191

192192
# we don't reply to this call. we only reply when we receive demand from
193193
# the consumers, signifying that every one has processed this txn
194-
{:noreply, [txn], %{state | producer: from} |> put_last_seen_lsn(txn.lsn)}
194+
{:noreply, [txn], %{state | producer: from} |> put_last_processed_lsn(txn.lsn)}
195195
end
196196

197197
defp handle_relation(rel, from, state) do
@@ -200,41 +200,27 @@ defmodule Electric.Replication.ShapeLogCollector do
200200
{updated_rel, tracker_state} =
201201
AffectedColumns.transform_relation(rel, state.tracked_relations)
202202

203+
:ok =
204+
PersistentReplicationState.set_tracked_relations(
205+
tracker_state,
206+
state.persistent_replication_data_opts
207+
)
208+
203209
case state do
204210
%{subscriptions: {0, _}} ->
205211
Logger.debug(fn ->
206212
"Dropping relation message for #{inspect(rel.schema)}.#{inspect(rel.table)}: no active consumers"
207213
end)
208214

209-
:ok =
210-
PersistentReplicationState.set_tracked_relations(
211-
state.tracked_relations,
212-
state.persistent_replication_data_opts
213-
)
214-
215215
{:reply, :ok, [], %{state | tracked_relations: tracker_state}}
216216

217217
_ ->
218218
{:noreply, [updated_rel], %{state | producer: from, tracked_relations: tracker_state}}
219219
end
220220
end
221221

222-
defp drop_transaction(txn, state) do
222+
defp drop_transaction(state) do
223223
OpenTelemetry.add_span_attributes("txn.is_dropped": true)
224-
225-
state =
226-
if Lsn.is_larger(txn.lsn, state.last_seen_lsn) do
227-
:ok =
228-
PersistentReplicationState.set_last_processed_lsn(
229-
txn.lsn,
230-
state.persistent_replication_data_opts
231-
)
232-
233-
put_last_seen_lsn(state, txn.lsn)
234-
else
235-
state
236-
end
237-
238224
{:reply, :ok, [], state}
239225
end
240226

@@ -261,9 +247,9 @@ defmodule Electric.Replication.ShapeLogCollector do
261247
state
262248
end
263249

264-
defp put_last_seen_lsn(%{last_seen_lsn: last_seen_lsn} = state, lsn)
265-
when Lsn.is_larger(lsn, last_seen_lsn),
266-
do: %{state | last_seen_lsn: lsn}
250+
defp put_last_processed_lsn(%{last_processed_lsn: last_processed_lsn} = state, lsn)
251+
when Lsn.is_larger(lsn, last_processed_lsn),
252+
do: %{state | last_processed_lsn: lsn}
267253

268-
defp put_last_seen_lsn(state, _lsn), do: state
254+
defp put_last_processed_lsn(state, _lsn), do: state
269255
end

0 commit comments

Comments
 (0)