Skip to content

Commit c4473b0

Browse files
authored
feat: remove full table info from shape definitions to lower mem (#2454)
## Reasons We have a process setup where we have 2 layers of supervisors which store entire child spec of their children as state, plus the `ShapeLogCollector` that the shape consumers register with, plus the consumers themselves -- all these processes happen to have shapes in their state. _Arguably, we can inject shapes into consumers bypassing supervisors, but that's a different PR._ These shapes, however, happened to know entire schema of the table they were defined against, which was duplicated a lot. This caused a lot of unnecessary memory usage when the table schema was sufficiently large. So, in numbers, for a 75 column table, without any transactions going through (just shape creation) Baseline, no shapes: 71kB - Current, 300 shapes - Total (for shape-related processes): 64.1MB - Total overhead in shared processes: 28MB - Total overhead in shared per shape: 93kB - Per-shape process total: 35.9MB - Per-shape 119kB - With this PR, 300 shapes - Total (for shape-related processes): 21MB - Total overhead in shared processes: 7.3MB - Total overhead in shared per shape: 24kB - Per-shape process total: 13MB - Per-shape 45kB So for a sufficiently large table, this PR drops used memory a lot, 60% in this particular case. Further optimizations of state are possible and we'll do them, but this is the lowest hanging fruit. ## Changes Shapes now don't store their table's entire schema. That's OK when processing transactions or querying the DB, but that's not enough to invalidate the shape when the schema changes, and it can change while Electric is down as well. To address this, now the burden of figuring out whether a schema-breaking change has happened from the incoming relations now lies upon `ShapeLogCollector`. It tracks all seen relation messages (which should be relatively rare unless PG is constantly disconnecting), and marks the columns that have changed within this relation as compared to a previously seen version. It also saves these last-seen relations when all shapes have reacted to it and invalidated themselves.
1 parent 6d925cc commit c4473b0

File tree

22 files changed

+733
-286
lines changed

22 files changed

+733
-286
lines changed

.changeset/big-candles-approve.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
feat: lower the per-shape memory usage, especially for very large tables

packages/sync-service/lib/electric/replication/changes.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,14 @@ defmodule Electric.Replication.Changes do
177177

178178
defmodule Relation do
179179
@derive Jason.Encoder
180-
defstruct [:id, :schema, :table, :columns]
180+
defstruct [:id, :schema, :table, :columns, affected_columns: []]
181181

182182
@type t() :: %__MODULE__{
183183
id: Changes.relation_id(),
184184
schema: Changes.db_identifier(),
185185
table: Changes.db_identifier(),
186-
columns: [Column.t()]
186+
columns: [Column.t()],
187+
affected_columns: [Changes.db_identifier()]
187188
}
188189
end
189190

packages/sync-service/lib/electric/replication/eval/expr.ex

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ defmodule Electric.Replication.Eval.Expr do
33
Parsed expression, available for evaluation using the runner
44
"""
55

6+
alias Electric.Replication.Eval.Parser
67
alias Electric.Replication.Eval.Env
78

89
defstruct [:query, :eval, :used_refs, :returns]
@@ -37,4 +38,41 @@ defmodule Electric.Replication.Eval.Expr do
3738
|> Enum.filter(&match?({[_], _}, &1))
3839
|> Enum.map(fn {[key], _} -> key end)
3940
end
41+
42+
@doc false
43+
@spec to_json_safe(t()) :: map()
44+
def to_json_safe(%__MODULE__{query: query, used_refs: used_refs}) do
45+
%{
46+
version: 1,
47+
query: query,
48+
used_refs:
49+
Enum.map(used_refs, fn
50+
{k, v} when is_tuple(v) -> [k, Tuple.to_list(v)]
51+
{k, v} -> [k, v]
52+
end)
53+
}
54+
end
55+
56+
@doc false
57+
@spec from_json_safe(map()) :: {:ok, t()} | {:error, String.t()}
58+
def from_json_safe(%{"version" => 1, "query" => query, "used_refs" => refs}) do
59+
refs =
60+
Map.new(refs, fn
61+
[k, v] when is_list(v) -> {k, List.to_tuple(v)}
62+
[k, v] when is_binary(v) -> {k, String.to_existing_atom(v)}
63+
end)
64+
65+
Parser.parse_and_validate_expression(query, refs: refs)
66+
end
67+
68+
def from_json_safe(_),
69+
do: {:error, "Incorrect serialized format: keys must be `version`, `query`, `used_refs`"}
70+
end
71+
72+
defimpl Jason.Encoder, for: Electric.Replication.Eval.Expr do
73+
def encode(expr, opts) do
74+
expr
75+
|> Electric.Replication.Eval.Expr.to_json_safe()
76+
|> Jason.Encode.map(opts)
77+
end
4078
end

packages/sync-service/lib/electric/replication/persistent_replication_state.ex

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
defmodule Electric.Replication.PersistentReplicationState do
22
alias Electric.PersistentKV
33
alias Electric.Postgres.Lsn
4+
alias Electric.Replication.Changes
45
require Logger
56

67
@type opts() :: [
@@ -29,9 +30,33 @@ defmodule Electric.Replication.PersistentReplicationState do
2930
|> Lsn.from_integer()
3031
end
3132

33+
@base_tracked_relations %{
34+
table_to_id: %{},
35+
id_to_table_info: %{}
36+
}
37+
38+
@type tracked_relations :: %{
39+
table_to_id: %{{String.t(), String.t()} => Changes.relation_id()},
40+
id_to_table_info: %{Changes.relation_id() => Changes.Relation.t()}
41+
}
42+
43+
@spec set_tracked_relations(tracked_relations, opts()) :: :ok
44+
def set_tracked_relations(tracked_relations, opts) do
45+
set("tracked_relations", tracked_relations, opts)
46+
end
47+
48+
@spec get_tracked_relations(opts()) :: tracked_relations()
49+
def get_tracked_relations(opts) do
50+
case get("tracked_relations", opts) do
51+
{:ok, tracked_relations} -> tracked_relations
52+
{:error, :not_found} -> @base_tracked_relations
53+
end
54+
end
55+
3256
@spec reset(opts()) :: :ok
3357
def reset(opts) do
3458
set(@last_processed_lsn_key, 0, opts)
59+
set_tracked_relations(@base_tracked_relations, opts)
3560
end
3661

3762
@spec set(String.t(), any(), opts()) :: :ok

packages/sync-service/lib/electric/replication/publication_manager.ex

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ defmodule Electric.Replication.PublicationManager do
150150

151151
@impl true
152152
def init(opts) do
153+
Logger.metadata(stack_id: Access.fetch!(opts, :stack_id))
154+
Process.set_label({:publication_manager, Access.fetch!(opts, :stack_id)})
155+
153156
state = %__MODULE__{
154157
relation_filter_counters: %{},
155158
prepared_relation_filters: %{},
@@ -437,7 +440,7 @@ defmodule Electric.Replication.PublicationManager do
437440
end
438441

439442
@spec get_selected_columns_for_shape(Shape.t()) :: MapSet.t(String.t() | nil)
440-
defp get_selected_columns_for_shape(%Shape{where: _, selected_columns: nil}),
443+
defp get_selected_columns_for_shape(%Shape{where: _, flags: %{selects_all_columns: true}}),
441444
do: MapSet.new([nil])
442445

443446
defp get_selected_columns_for_shape(%Shape{where: nil, selected_columns: columns}),
@@ -453,20 +456,8 @@ defmodule Electric.Replication.PublicationManager do
453456
MapSet.t(Electric.Replication.Eval.Expr.t() | nil)
454457
defp get_where_clauses_for_shape(%Shape{where: nil}), do: MapSet.new([nil])
455458
# TODO: flatten where clauses by splitting top level ANDs
456-
defp get_where_clauses_for_shape(%Shape{
457-
where: where,
458-
table_info: table_info,
459-
root_table: root_table
460-
}) do
461-
unqualified_refs = Expr.unqualified_refs(where)
462-
463-
table_info[root_table].columns
464-
|> Enum.filter(&(&1.name in unqualified_refs))
465-
|> Enum.any?(fn
466-
%{type_kind: kind} when kind in [:enum, :domain, :composite] -> true
467-
_ -> false
468-
end)
469-
|> if do
459+
defp get_where_clauses_for_shape(%Shape{where: where, flags: flags}) do
460+
if Map.get(flags, :non_primitive_columns_in_where, false) do
470461
MapSet.new([nil])
471462
else
472463
MapSet.new([where])

packages/sync-service/lib/electric/replication/shape_log_collector.ex

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule Electric.Replication.ShapeLogCollector do
66
use GenStage
77

88
require Electric.Postgres.Lsn
9+
alias Electric.Replication.ShapeLogCollector.AffectedColumns
910
alias Electric.Postgres.Lsn
1011
alias Electric.Replication.PersistentReplicationState
1112
alias Electric.Postgres.Inspector
@@ -67,13 +68,19 @@ defmodule Electric.Replication.ShapeLogCollector do
6768
persistent_kv: opts.persistent_kv
6869
]
6970

71+
{:ok, tracker_state} =
72+
persistent_replication_data_opts
73+
|> PersistentReplicationState.get_tracked_relations()
74+
|> AffectedColumns.init()
75+
7076
state =
7177
Map.merge(opts, %{
7278
producer: nil,
7379
subscriptions: {0, MapSet.new()},
7480
persistent_replication_data_opts: persistent_replication_data_opts,
7581
last_seen_lsn:
76-
PersistentReplicationState.get_last_processed_lsn(persistent_replication_data_opts)
82+
PersistentReplicationState.get_last_processed_lsn(persistent_replication_data_opts),
83+
tracked_relations: tracker_state
7784
})
7885

7986
# start in demand: :accumulate mode so that the ShapeCache is able to start
@@ -110,6 +117,12 @@ defmodule Electric.Replication.ShapeLogCollector do
110117
state.persistent_replication_data_opts
111118
)
112119

120+
:ok =
121+
PersistentReplicationState.set_tracked_relations(
122+
state.tracked_relations,
123+
state.persistent_replication_data_opts
124+
)
125+
113126
{:noreply, [], %{state | producer: nil}}
114127
end
115128

@@ -181,19 +194,29 @@ defmodule Electric.Replication.ShapeLogCollector do
181194
{:noreply, [txn], %{state | producer: from} |> put_last_seen_lsn(txn.lsn)}
182195
end
183196

184-
defp handle_relation(rel, _from, %{subscriptions: {0, _}} = state) do
185-
Logger.debug(fn ->
186-
"Dropping relation message for #{inspect(rel.schema)}.#{inspect(rel.table)}: no active consumers"
187-
end)
197+
defp handle_relation(rel, from, state) do
198+
OpenTelemetry.add_span_attributes("rel.is_dropped": false)
188199

189-
OpenTelemetry.add_span_attributes("rel.is_dropped": true)
200+
{updated_rel, tracker_state} =
201+
AffectedColumns.transform_relation(rel, state.tracked_relations)
190202

191-
{:reply, :ok, [], state}
192-
end
203+
case state do
204+
%{subscriptions: {0, _}} ->
205+
Logger.debug(fn ->
206+
"Dropping relation message for #{inspect(rel.schema)}.#{inspect(rel.table)}: no active consumers"
207+
end)
193208

194-
defp handle_relation(rel, from, state) do
195-
OpenTelemetry.add_span_attributes("rel.is_dropped": false)
196-
{:noreply, [rel], %{state | producer: from}}
209+
:ok =
210+
PersistentReplicationState.set_tracked_relations(
211+
state.tracked_relations,
212+
state.persistent_replication_data_opts
213+
)
214+
215+
{:reply, :ok, [], %{state | tracked_relations: tracker_state}}
216+
217+
_ ->
218+
{:noreply, [updated_rel], %{state | producer: from, tracked_relations: tracker_state}}
219+
end
197220
end
198221

199222
defp drop_transaction(txn, state) do
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
defmodule Electric.Replication.ShapeLogCollector.AffectedColumns do
2+
@moduledoc false
3+
4+
require Logger
5+
alias Electric.Replication.Changes.Relation
6+
7+
def init(%{id_to_table_info: id_to_table_info, table_to_id: table_to_id})
8+
when is_map(id_to_table_info) and is_map(table_to_id) do
9+
{:ok, %{id_to_table_info: id_to_table_info, table_to_id: table_to_id}}
10+
end
11+
12+
def transform_relation(
13+
%Relation{schema: schema, table: table, id: id} = rel,
14+
%{
15+
id_to_table_info: id_to_table_info,
16+
table_to_id: table_to_id
17+
} = state
18+
) do
19+
schema_table = {schema, table}
20+
21+
existing_id = Map.get(table_to_id, schema_table)
22+
existing_rel = Map.get(id_to_table_info, id)
23+
24+
case {existing_id, existing_rel} do
25+
# New relation, register it
26+
{nil, nil} ->
27+
{rel, add_relation(state, id, rel)}
28+
29+
# Relation identity matches known, let's compare columns
30+
{^id, %Relation{schema: ^schema, table: ^table}} ->
31+
case find_differing_columns(existing_rel, rel) do
32+
# No (noticable) changes to the relation, continue as-is
33+
[] ->
34+
{rel, state}
35+
36+
affected_cols ->
37+
updated_rel = %{rel | affected_columns: affected_cols}
38+
{updated_rel, add_relation(state, id, rel)}
39+
end
40+
41+
# Some part of identity changed, update the state and pass it through
42+
{_, _} ->
43+
Logger.debug(fn ->
44+
"Relation identity changed: #{existing_id}/#{inspect(existing_rel)} -> #{inspect(rel)}"
45+
end)
46+
47+
{rel,
48+
state
49+
|> delete_tracked_relation(schema_table_key(existing_rel), existing_id)
50+
|> add_relation(id, rel)}
51+
end
52+
end
53+
54+
defp schema_table_key(%Relation{schema: schema, table: table}), do: {schema, table}
55+
defp schema_table_key(nil), do: nil
56+
57+
defp add_relation(state, id, rel) do
58+
state
59+
|> put_in([:table_to_id, schema_table_key(rel)], id)
60+
|> put_in([:id_to_table_info, id], rel)
61+
end
62+
63+
defp delete_tracked_relation(state, schema_table, id) do
64+
state
65+
|> update_in([:table_to_id], &Map.delete(&1, schema_table))
66+
|> update_in([:id_to_table_info], &Map.delete(&1, id))
67+
end
68+
69+
defp find_differing_columns(%Relation{columns: old_cols}, %Relation{columns: new_cols})
70+
when old_cols == new_cols,
71+
do: []
72+
73+
defp find_differing_columns(%Relation{columns: old_cols}, %Relation{columns: new_cols}) do
74+
(old_cols ++ new_cols)
75+
|> Enum.reduce(%{}, fn
76+
%{name: name, type_oid: type_oid}, acc when is_map_key(acc, name) ->
77+
# We're seeing column with this name for a second time, so we can remove it from the diff if type oid is the same
78+
if acc[name] == type_oid, do: Map.delete(acc, name), else: acc
79+
80+
%{name: name, type_oid: type_oid}, acc ->
81+
# If we're seeing column with this name for a first time, it'll either stay if it's present only in one set,
82+
# or be deleted if seen again
83+
Map.put(acc, name, type_oid)
84+
end)
85+
|> Map.keys()
86+
end
87+
end

packages/sync-service/lib/electric/shape_cache/file_storage.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ defmodule Electric.ShapeCache.FileStorage do
167167

168168
with {:ok, shape_def_encoded} <- File.read(shape_def_path),
169169
{:ok, shape_def_json} <- Jason.decode(shape_def_encoded),
170-
shape = Electric.Shapes.Shape.from_json_safe!(shape_def_json) do
170+
{:ok, shape} <- Electric.Shapes.Shape.from_json_safe(shape_def_json) do
171171
Map.put(acc, shape_handle, shape)
172172
else
173173
# if the shape definition file cannot be read/decoded, just ignore it

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
defmodule Electric.Shapes.Api do
2+
alias Electric.Postgres.Inspector
23
alias Electric.Replication.LogOffset
34
alias Electric.Shapes
45
alias Electric.Telemetry.OpenTelemetry
@@ -624,18 +625,25 @@ defmodule Electric.Shapes.Api do
624625
apply(encoder, type, [message])
625626
end
626627

627-
def schema(%Request{params: params}) do
628-
schema(params)
629-
end
630-
631-
def schema(%{shape_definition: %Shapes.Shape{} = shape}) do
632-
shape.table_info
633-
|> Map.fetch!(shape.root_table)
634-
|> Map.fetch!(:columns)
635-
|> Electric.Schema.from_column_info(shape.selected_columns)
628+
def schema(%Response{
629+
api: %Api{inspector: inspector},
630+
shape_definition: %Shapes.Shape{} = shape
631+
}) do
632+
# This technically does double work because we've already fetched this info to build the shape,
633+
# but that's not a big deal as it's all ETS backed. This also has an added benefit that
634+
# if table schema changes in a way that doesn't invalidate the shape or we can't detect
635+
# (e.g. column nullability changes but the type remains the same), we might return the new
636+
# version if it's invalidated in ETS or server is restarted.
637+
case Inspector.load_column_info(shape.root_table, inspector) do
638+
{:ok, columns} ->
639+
Electric.Schema.from_column_info(columns, shape.selected_columns)
640+
641+
:table_not_found ->
642+
nil
643+
end
636644
end
637645

638-
def schema(_) do
646+
def schema(_req) do
639647
nil
640648
end
641649

packages/sync-service/lib/electric/shapes/api/response.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ defmodule Electric.Shapes.Api.Response do
5454
args
5555
|> Keyword.put_new(:status, 400)
5656
|> Keyword.put(:body, error_body(api, message, args))
57+
|> Keyword.put(:api, api)
5758

5859
struct(__MODULE__, opts)
5960
end
@@ -64,6 +65,7 @@ defmodule Electric.Shapes.Api.Response do
6465
|> Keyword.put_new(:status, 400)
6566
|> Keyword.put(:body, error_body(request, message, args))
6667
|> Keyword.put(:shape_definition, request.params.shape_definition)
68+
|> Keyword.put(:api, request.api)
6769

6870
struct(__MODULE__, opts)
6971
end

0 commit comments

Comments
 (0)