Skip to content

Commit d146a16

Browse files
committed
Set txn ids for buffered transactions
1 parent 3f88aa1 commit d146a16

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

packages/sync-service/lib/electric/shapes/consumer/event_handler/subqueries/buffering.ex

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,13 +265,22 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
265265
Enum.reduce_while(txns, {:ok, []}, fn txn, {:ok, acc} ->
266266
case Subqueries.convert_transaction(txn, state, views) do
267267
{:error, :truncate} -> {:halt, {:error, {:truncate, txn.xid}}}
268-
{:ok, changes} -> {:cont, {:ok, acc ++ changes}}
268+
269+
{:ok, []} ->
270+
{:cont, {:ok, acc}}
271+
272+
{:ok, changes} ->
273+
{:cont,
274+
{:ok,
275+
[
276+
%LogOp.AppendChanges{changes: changes, default_xid: txn.xid}
277+
| acc
278+
]}}
269279
end
270280
end)
271281
|> case do
272282
{:error, _} = error -> error
273-
{:ok, []} -> {:ok, []}
274-
{:ok, changes} -> {:ok, [%LogOp.AppendChanges{changes: changes}]}
283+
{:ok, ops} -> {:ok, Enum.reverse(ops)}
275284
end
276285
end
277286

packages/sync-service/test/electric/shapes/consumer_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2083,7 +2083,7 @@ defmodule Electric.Shapes.ConsumerTest do
20832083
},
20842084
%{"headers" => %{"control" => "snapshot-end"}},
20852085
%{
2086-
"headers" => %{"operation" => "update"},
2086+
"headers" => %{"operation" => "update", "txids" => [100]},
20872087
"key" => ~s'"public"."test_table"/"1"'
20882088
}
20892089
] = get_log_items_from_storage(LogOffset.last_before_real_offsets(), shape_storage)

0 commit comments

Comments
 (0)