Skip to content

Commit c444072

Browse files
authored
feat(sync-service): index for array contains (#2359)
This PR seeks to address trigger.dev's WAL lag issues by adding two optimisations to where clause filtering: - Allow multiple conditions in a where clause to be optimised (not just one) - Optimise where clauses that have a condition in the form 'array_field @> array_const' ## Allow multiple conditions in a where clause to be optimised (not just one) This feature alone should halve the processing time for trigger.dev as they have many where clauses in the form `WHERE runtimeEnvironmentId = ? AND batchId = ?` where if a change matched the `runtimeEnvironmentId` it would then have to iterate through the `batchId`s. Now it doesn't - the `batchId` condition is indexed as well. ## Optimise where clauses that have a condition in the form 'array_field @> array_const' Trigger.dev's other problematic where clauses are in the form `WHERE runtimeEnvironmentId = ? AND runTags @> ?` and this feature optimises the `@>` operation. The algorithm for this optimisation can be seen in the [InclusionIndex module](https://github.com/electric-sql/electric/pull/2359/files#diff-44c9f3ec68658f9ea6690a7608b99ad0463d7dcc8f1c95e9a2617e1d3d30a3d2). Comparing the performance of this index against the current method shows quite a dramatic difference: <img width="707" alt="Screenshot 2025-02-19 at 19 42 59" src="https://github.com/user-attachments/assets/ea9de81c-50e7-45d1-a81f-d5a21c36daeb" /> Here you can see it performs well with 100k shapes (which for trigger.dev would be 100k shapes with the same `runtimeEnvironmentId`, in other words 100k shapes per user) <img width="692" alt="Screenshot 2025-02-19 at 19 43 47" src="https://github.com/user-attachments/assets/e2fde600-6c0f-4606-ac93-18daa8e50438" /> Both of the charts above are based on a shape array size of 3 and a change array size of 10 which is typical for trigger.dev. The algorithm seems to scale roughly linearly with change array size: <img width="714" alt="Screenshot 2025-02-24 at 16 12 34" src="https://github.com/user-attachments/assets/ce1703c1-9eea-4d19-bc44-8c92615f8da4" /> The algorithm seems to scale well with shape array size for a fixed change array size (10): <img width="714" alt="Screenshot 2025-02-24 at 16 14 35" src="https://github.com/user-attachments/assets/fcd6f956-8c01-4580-8e55-4b0eeb17e86f" />
1 parent f92d4b3 commit c444072

File tree

10 files changed

+752
-355
lines changed

10 files changed

+752
-355
lines changed

.changeset/mean-lies-kiss.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Allow multiple conditions in a where clause to be optimised (not just one)

.changeset/twelve-jeans-admire.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Optimise where clauses that have a condition in the form 'array_field @> array_const'

packages/sync-service/lib/electric/shapes/filter.ex

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule Electric.Shapes.Filter do
77
88
99
The `Filter` module keeps track of what tables are referenced by the shapes and changes and delegates
10-
the table specific logic to the `Filter.Table` module.
10+
the table specific logic to the `Filter.WhereCondition` module.
1111
"""
1212

1313
alias Electric.Replication.Changes.DeletedRecord
@@ -17,7 +17,7 @@ defmodule Electric.Shapes.Filter do
1717
alias Electric.Replication.Changes.TruncatedRelation
1818
alias Electric.Replication.Changes.UpdatedRecord
1919
alias Electric.Shapes.Filter
20-
alias Electric.Shapes.Filter.Table
20+
alias Electric.Shapes.Filter.WhereCondition
2121
alias Electric.Shapes.Shape
2222

2323
require Logger
@@ -45,9 +45,9 @@ defmodule Electric.Shapes.Filter do
4545
Map.update(
4646
tables,
4747
shape.root_table,
48-
Table.add_shape(Table.new(shape.root_table), {shape_id, shape}),
49-
fn table ->
50-
Table.add_shape(table, {shape_id, shape})
48+
WhereCondition.add_shape(WhereCondition.new(), {shape_id, shape}, shape.where),
49+
fn condition ->
50+
WhereCondition.add_shape(condition, {shape_id, shape}, shape.where)
5151
end
5252
)
5353
}
@@ -61,10 +61,10 @@ defmodule Electric.Shapes.Filter do
6161
%Filter{
6262
tables:
6363
tables
64-
|> Enum.map(fn {table_name, table} ->
65-
{table_name, Table.remove_shape(table, shape_id)}
64+
|> Enum.map(fn {table_name, condition} ->
65+
{table_name, WhereCondition.remove_shape(condition, shape_id)}
6666
end)
67-
|> Enum.reject(fn {_table, table} -> Table.empty?(table) end)
67+
|> Enum.reject(fn {_table, condition} -> WhereCondition.empty?(condition) end)
6868
|> Map.new()
6969
}
7070
end
@@ -134,14 +134,17 @@ defmodule Electric.Shapes.Filter do
134134

135135
defp shapes_affected_by_record(filter, table_name, record) do
136136
case Map.get(filter.tables, table_name) do
137-
nil -> MapSet.new()
138-
table -> Table.affected_shapes(table, record)
137+
nil ->
138+
MapSet.new()
139+
140+
condition ->
141+
WhereCondition.affected_shapes(condition, record)
139142
end
140143
end
141144

142145
defp all_shapes(%Filter{} = filter) do
143-
for {_table, table} <- filter.tables,
144-
{shape_id, shape} <- Table.all_shapes(table),
146+
for {_table, condition} <- filter.tables,
147+
{shape_id, shape} <- WhereCondition.all_shapes(condition),
145148
into: %{} do
146149
{shape_id, shape}
147150
end
@@ -150,7 +153,7 @@ defmodule Electric.Shapes.Filter do
150153
defp all_shapes_for_table(%Filter{} = filter, table_name) do
151154
case Map.get(filter.tables, table_name) do
152155
nil -> %{}
153-
table -> Table.all_shapes(table)
156+
condition -> WhereCondition.all_shapes(condition)
154157
end
155158
end
156159
end
Lines changed: 18 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,26 @@
11
defmodule Electric.Shapes.Filter.Index do
22
@moduledoc """
3-
Responsible for knowing which shapes are affected by a change to a specific field.
3+
Efficiently finds shapes that are affected by a change, specifically for a particular operation in where clause.
44
5-
The `%Table{}` struct contains `values` a map of values for a specific field to shapes that are affected by that field value.
6-
This acts as an index for the shapes, providing a fast way to know which shapes have been affected without having to
7-
iterate over all the shapes.
8-
9-
Currently only `=` operations are indexed.
5+
Each type of operation that has been optimised such as `=` or `@>` will have it's own index module that implements the `Protocol` for this module.
106
"""
7+
alias Electric.Shapes.Filter.Index.Protocol
8+
alias Electric.Shapes.Filter.Indexes
119

12-
alias Electric.Replication.Eval.Env
13-
alias Electric.Shapes.Filter.Index
14-
alias Electric.Shapes.WhereClause
15-
alias Electric.Telemetry.OpenTelemetry
16-
require Logger
17-
18-
defstruct [:type, :values]
19-
20-
def new(type), do: %Index{type: type, values: %{}}
21-
22-
def empty?(%Index{values: values}), do: values == %{}
23-
24-
def add_shape(%Index{} = index, value, {shape_id, shape}, and_where) do
25-
shape_info = %{shape: shape, and_where: and_where}
26-
27-
%{
28-
index
29-
| values:
30-
Map.update(
31-
index.values,
32-
value,
33-
%{shape_id => shape_info},
34-
&Map.put(&1, shape_id, shape_info)
35-
)
36-
}
37-
end
10+
def new("=", type), do: Indexes.EqualityIndex.new(type)
11+
def new("@>", type), do: Indexes.InclusionIndex.new(type)
3812

39-
def remove_shape(%Index{} = index, shape_id) do
40-
%{
41-
index
42-
| values:
43-
index.values
44-
|> Map.new(fn {value, shapes} -> {value, Map.delete(shapes, shape_id)} end)
45-
|> Enum.reject(fn {_value, shapes} -> shapes == %{} end)
46-
|> Map.new()
47-
}
48-
end
49-
50-
def affected_shapes(%Index{values: values, type: type}, field, record) do
51-
case Map.get(values, value_from_record(record, field, type)) do
52-
nil ->
53-
MapSet.new()
54-
55-
shapes ->
56-
OpenTelemetry.with_span(
57-
"filter.index.filter_matched_shapes",
58-
[field: field, matched_shapes_count: map_size(shapes)],
59-
fn ->
60-
for {shape_id, shape} <- shapes,
61-
WhereClause.includes_record?(shape.and_where, record),
62-
into: MapSet.new() do
63-
shape_id
64-
end
65-
end
66-
)
67-
end
68-
end
69-
70-
@env Env.new()
71-
defp value_from_record(record, field, type) do
72-
case Env.parse_const(@env, record[field], type) do
73-
{:ok, value} ->
74-
value
75-
76-
:error ->
77-
raise RuntimeError,
78-
message: "Could not parse value for field #{inspect(field)} of type #{inspect(type)}"
79-
end
80-
end
13+
defdelegate empty?(index), to: Protocol
14+
defdelegate add_shape(index, value, shape_instance, and_where), to: Protocol
15+
defdelegate remove_shape(index, shape_id), to: Protocol
16+
defdelegate affected_shapes(index, field, record), to: Protocol
17+
defdelegate all_shapes(index), to: Protocol
18+
end
8119

82-
def all_shapes(%Index{values: values}) do
83-
for {_value, shapes} <- values,
84-
{shape_id, %{shape: shape}} <- shapes,
85-
into: %{} do
86-
{shape_id, shape}
87-
end
88-
end
20+
defprotocol Electric.Shapes.Filter.Index.Protocol do
21+
def empty?(index)
22+
def add_shape(index, value, shape_instance, and_where)
23+
def remove_shape(index, shape_id)
24+
def affected_shapes(index, field, record)
25+
def all_shapes(index)
8926
end
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
defmodule Electric.Shapes.Filter.Indexes.EqualityIndex do
2+
@moduledoc """
3+
Efficiently finds shapes that are affected by a change when the shape's where clause has `field = const` in it.
4+
5+
The index maps the values to the shapes that have that value as it's const in the `field = const` condition.
6+
7+
Rather than directly adding shapes, shapes are added to a `%WhereCondition{}` which represents can contain multiple
8+
shapes and allows for further optimisations of other conditions in the shape's where clause.
9+
"""
10+
alias Electric.Replication.Eval.Env
11+
alias Electric.Shapes.Filter.Index
12+
alias Electric.Shapes.Filter.Indexes.EqualityIndex
13+
alias Electric.Shapes.Filter.WhereCondition
14+
require Logger
15+
16+
defstruct [:type, :values]
17+
18+
def new(type), do: %EqualityIndex{type: type, values: %{}}
19+
20+
defimpl Index.Protocol, for: EqualityIndex do
21+
def empty?(%EqualityIndex{values: values}), do: values == %{}
22+
23+
def add_shape(%EqualityIndex{} = index, value, {shape_id, shape}, and_where) do
24+
index.values
25+
|> Map.put_new(value, WhereCondition.new())
26+
|> Map.update!(value, &WhereCondition.add_shape(&1, {shape_id, shape}, and_where))
27+
|> then(&%{index | values: &1})
28+
end
29+
30+
def remove_shape(%EqualityIndex{} = index, shape_id) do
31+
index.values
32+
|> Enum.map(fn {value, condition} ->
33+
{value, WhereCondition.remove_shape(condition, shape_id)}
34+
end)
35+
|> Enum.reject(fn {_table, condition} -> WhereCondition.empty?(condition) end)
36+
|> Map.new()
37+
|> then(&%{index | values: &1})
38+
end
39+
40+
def affected_shapes(%EqualityIndex{values: values, type: type}, field, record) do
41+
case Map.get(values, value_from_record(record, field, type)) do
42+
nil ->
43+
MapSet.new()
44+
45+
condition ->
46+
WhereCondition.affected_shapes(condition, record)
47+
end
48+
end
49+
50+
@env Env.new()
51+
defp value_from_record(record, field, type) do
52+
case Env.parse_const(@env, record[field], type) do
53+
{:ok, value} ->
54+
value
55+
56+
:error ->
57+
raise RuntimeError,
58+
message: "Could not parse value for field #{inspect(field)} of type #{inspect(type)}"
59+
end
60+
end
61+
62+
def all_shapes(%EqualityIndex{values: values}) do
63+
for {_value, condition} <- values,
64+
{shape_id, shape} <- WhereCondition.all_shapes(condition),
65+
into: %{} do
66+
{shape_id, shape}
67+
end
68+
end
69+
end
70+
end

0 commit comments

Comments
 (0)