Skip to content

Commit 22c2bb5

Browse files
committed
Reject shapes that fail DnfPlan on creation
1 parent d146a16 commit 22c2bb5

File tree

6 files changed

+95
-54
lines changed

6 files changed

+95
-54
lines changed

packages/sync-service/lib/electric/shapes/api/params.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ defmodule Electric.Shapes.Api.Params do
33

44
alias Electric.Replication.LogOffset
55
alias Electric.Shapes.Api
6+
alias Electric.Shapes.DnfPlan
67
alias Electric.Shapes.Shape
78

89
import Ecto.Changeset
@@ -331,7 +332,13 @@ defmodule Electric.Shapes.Api.Params do
331332
log_mode: fetch_field!(changeset, :log)
332333
) do
333334
{:ok, shape} ->
334-
put_change(changeset, :shape_definition, shape)
335+
case DnfPlan.compile(shape) do
336+
{:error, reason} ->
337+
add_error(changeset, :where, reason)
338+
339+
_ok ->
340+
put_change(changeset, :shape_definition, shape)
341+
end
335342

336343
{:error, :connection_not_available} ->
337344
add_error(

packages/sync-service/lib/electric/shapes/dnf_plan.ex

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -537,16 +537,22 @@ defmodule Electric.Shapes.DnfPlan do
537537
with {:ok, decomposition} <- Decomposer.decompose(shape.where.eval) do
538538
positions = enrich_positions(decomposition.subexpressions, shape)
539539

540-
{:ok,
541-
%__MODULE__{
542-
disjuncts: decomposition.disjuncts,
543-
disjuncts_positions: decomposition.disjuncts_positions,
544-
position_count: decomposition.position_count,
545-
positions: positions,
546-
dependency_positions: build_dependency_positions(positions),
547-
dependency_disjuncts: build_dependency_disjuncts(decomposition.disjuncts, positions),
548-
dependency_polarities: build_dependency_polarities(positions)
549-
}}
540+
case build_dependency_polarities(positions) do
541+
{:ok, dependency_polarities} ->
542+
{:ok,
543+
%__MODULE__{
544+
disjuncts: decomposition.disjuncts,
545+
disjuncts_positions: decomposition.disjuncts_positions,
546+
position_count: decomposition.position_count,
547+
positions: positions,
548+
dependency_positions: build_dependency_positions(positions),
549+
dependency_disjuncts: build_dependency_disjuncts(decomposition.disjuncts, positions),
550+
dependency_polarities: dependency_polarities
551+
}}
552+
553+
{:error, _} = error ->
554+
error
555+
end
550556
end
551557
end
552558

@@ -621,17 +627,18 @@ defmodule Electric.Shapes.DnfPlan do
621627
fn {_pos, info} -> info.dependency_index end,
622628
fn {_pos, info} -> info.negated end
623629
)
624-
|> Map.new(fn {dep_index, negated_flags} ->
630+
|> Enum.reduce_while({:ok, %{}}, fn {dep_index, negated_flags}, {:ok, acc} ->
625631
case Enum.uniq(negated_flags) do
626632
[false] ->
627-
{dep_index, :positive}
633+
{:cont, {:ok, Map.put(acc, dep_index, :positive)}}
628634

629635
[true] ->
630-
{dep_index, :negated}
636+
{:cont, {:ok, Map.put(acc, dep_index, :negated)}}
631637

632-
mixed ->
633-
raise ArgumentError,
634-
"dependency #{dep_index} has inconsistent polarity across positions: #{inspect(mixed)}"
638+
_mixed ->
639+
{:halt,
640+
{:error,
641+
"a subquery dependency cannot be used with both positive and negative polarity in the same filter"}}
635642
end
636643
end)
637644
end

packages/sync-service/lib/electric/shapes/shape.ex

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ defmodule Electric.Shapes.Shape do
33
Struct describing the requested shape
44
"""
55
alias Electric.Shapes.Consumer.Subqueries
6-
alias Electric.Replication.Eval.Decomposer
76
alias Electric.Replication.Eval.Expr
87
alias Electric.Postgres.Inspector
98
alias Electric.Replication.Eval.Parser
@@ -234,8 +233,7 @@ defmodule Electric.Shapes.Shape do
234233
validate_selected_columns(column_info, pk_cols, supported_features, opts),
235234
refs = Inspector.columns_to_expr(column_info),
236235
{:ok, where, shape_dependencies} <-
237-
validate_where_clause(Map.get(opts, :where), opts, refs),
238-
:ok <- validate_dnf_decomposition(where, shape_dependencies) do
236+
validate_where_clause(Map.get(opts, :where), opts, refs) do
239237
flags =
240238
[
241239
if(is_nil(Map.get(opts, :columns)), do: :selects_all_columns),
@@ -306,15 +304,6 @@ defmodule Electric.Shapes.Shape do
306304
end
307305
end
308306

309-
defp validate_dnf_decomposition(_where, []), do: :ok
310-
defp validate_dnf_decomposition(nil, _deps), do: :ok
311-
312-
defp validate_dnf_decomposition(where, [_ | _]) do
313-
case Decomposer.decompose(where.eval) do
314-
{:ok, _} -> :ok
315-
{:error, reason} -> {:error, {:where, reason}}
316-
end
317-
end
318307

319308
defp check_feature_flag(subqueries, opts) do
320309
if subqueries != [] and

packages/sync-service/test/electric/plug/router_test.exs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2249,6 +2249,23 @@ defmodule Electric.Plug.RouterTest do
22492249
|> Router.call(opts)
22502250
end
22512251

2252+
@tag with_sql: [
2253+
"CREATE TABLE parent (id INT PRIMARY KEY, value INT NOT NULL)",
2254+
"CREATE TABLE child (id INT PRIMARY KEY, parent_id INT NOT NULL REFERENCES parent(id), value INT NOT NULL)"
2255+
]
2256+
test "return 400 if same subquery is used with both positive and negative polarity", %{
2257+
opts: opts
2258+
} do
2259+
assert %{status: 400} =
2260+
conn("GET", "/v1/shape", %{
2261+
table: "child",
2262+
offset: "-1",
2263+
where:
2264+
"parent_id IN (SELECT id FROM parent) OR NOT parent_id IN (SELECT id FROM parent)"
2265+
})
2266+
|> Router.call(opts)
2267+
end
2268+
22522269
@tag with_sql: [
22532270
"CREATE TABLE parent (id INT PRIMARY KEY, value INT NOT NULL)",
22542271
"CREATE TABLE child (id INT PRIMARY KEY, parent_id INT NOT NULL REFERENCES parent(id), value INT NOT NULL)",

packages/sync-service/test/electric/shapes/dnf_plan_test.exs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,33 @@ defmodule Electric.Shapes.DnfPlanTest do
206206
end
207207
end
208208

209+
describe "compile/1 - mixed polarity" do
210+
test "returns error when same subquery is used with both positive and negative polarity" do
211+
# Parse with 2 separate sublinks, then remap $sublink/1 -> $sublink/0
212+
# to simulate what Shape.new's canonicalize_where_sublink_refs does
213+
# when the same subquery appears with opposite polarity.
214+
{where, deps} =
215+
parse_where_with_sublinks(
216+
~S"x IN (SELECT id FROM dep1) OR NOT x IN (SELECT id FROM dep2)",
217+
2
218+
)
219+
220+
# Remap $sublink/1 refs to $sublink/0 in the AST, simulating deduplication
221+
remapped_eval = remap_sublink_ref(where.eval, "1", "0")
222+
223+
remapped_used_refs =
224+
where.used_refs
225+
|> Map.delete(["$sublink", "1"])
226+
227+
where = %{where | eval: remapped_eval, used_refs: remapped_used_refs}
228+
229+
# Only 1 dependency since both refs now point to the same sublink
230+
shape = make_shape(where, [hd(deps)])
231+
assert {:error, reason} = DnfPlan.compile(shape)
232+
assert reason =~ "positive and negative polarity"
233+
end
234+
end
235+
209236
describe "compile/1 - nested subqueries compile per level" do
210237
test "outer and inner shapes compile independently" do
211238
# Outer shape: x IN sq1 (where sq1 itself has subqueries)
@@ -823,4 +850,23 @@ defmodule Electric.Shapes.DnfPlanTest do
823850
shape_dependencies_handles: Enum.with_index(deps, fn _, i -> "dep_handle_#{i}" end)
824851
}
825852
end
853+
854+
# Recursively remap $sublink refs in an eval AST
855+
defp remap_sublink_ref(%Parser.Ref{path: ["$sublink", from]} = ref, from, to) do
856+
%{ref | path: ["$sublink", to]}
857+
end
858+
859+
defp remap_sublink_ref(%Parser.Func{args: args} = func, from, to) do
860+
%{func | args: Enum.map(args, &remap_sublink_ref(&1, from, to))}
861+
end
862+
863+
defp remap_sublink_ref(%Parser.Array{elements: elements} = arr, from, to) do
864+
%{arr | elements: Enum.map(elements, &remap_sublink_ref(&1, from, to))}
865+
end
866+
867+
defp remap_sublink_ref(%Parser.RowExpr{elements: elements} = row, from, to) do
868+
%{row | elements: Enum.map(elements, &remap_sublink_ref(&1, from, to))}
869+
end
870+
871+
defp remap_sublink_ref(other, _from, _to), do: other
826872
end

packages/sync-service/test/electric/shapes/shape_test.exs

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -882,31 +882,6 @@ defmodule Electric.Shapes.ShapeTest do
882882
params: params
883883
)
884884
end
885-
886-
@tag with_sql: [
887-
"CREATE TABLE IF NOT EXISTS project (id INT PRIMARY KEY, value INT NOT NULL)",
888-
"CREATE TABLE IF NOT EXISTS item (id INT PRIMARY KEY, value INT NOT NULL)"
889-
]
890-
test "rejects subquery shapes whose WHERE clause is too complex for DNF decomposition", %{
891-
inspector: inspector
892-
} do
893-
# Build a WHERE with >100 disjuncts by cross-producting ORs with ANDs,
894-
# plus a subquery to trigger the DNF validation path.
895-
# (a1 OR a2 OR ... OR a11) AND (b1 OR b2 OR ... OR b11) = 121 disjuncts
896-
group_a = Enum.map_join(1..11, " OR ", fn i -> "value = #{i}" end)
897-
group_b = Enum.map_join(12..22, " OR ", fn i -> "value = #{i}" end)
898-
899-
where =
900-
"(#{group_a}) AND (#{group_b}) AND value IN (SELECT value FROM project WHERE value > 5)"
901-
902-
assert {:error,
903-
{:where,
904-
"WHERE clause too complex for DNF decomposition" <> _rest}} =
905-
Shape.new("item",
906-
inspector: inspector,
907-
where: where
908-
)
909-
end
910885
end
911886

912887
describe "new!/2" do

0 commit comments

Comments
 (0)