Skip to content

Commit dcd8a9f

Browse files
feat: add old values to replica=full (#2381)
Co-authored-by: Sam Willis <sam.willis@gmail.com>
1 parent 7600746 commit dcd8a9f

File tree

9 files changed

+136
-8
lines changed

9 files changed

+136
-8
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
feat: add `old_value` to updates under replica mode `full`

.changeset/twenty-sheep-swim.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@electric-sql/client": patch
3+
---
4+
5+
feat: add `old_value` to updates under replica mode `full`

packages/sync-service/lib/electric/log_items.ex

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ defmodule Electric.LogItems do
6565
{change.log_offset,
6666
%{
6767
key: change.key,
68-
value: update_values(change, pk_cols, replica),
6968
headers: %{
7069
operation: :update,
7170
txids: List.wrap(txids),
7271
relation: Tuple.to_list(change.relation),
7372
lsn: change.log_offset.tx_offset,
7473
op_position: change.log_offset.op_offset
7574
}
76-
}}
75+
}
76+
|> Map.merge(put_update_values(change, pk_cols, replica))}
7777
]
7878
end
7979

@@ -114,12 +114,26 @@ defmodule Electric.LogItems do
114114
defp take_pks_or_all(record, [], :default), do: record
115115
defp take_pks_or_all(record, pks, :default), do: Map.take(record, pks)
116116

117-
defp update_values(%{record: record, changed_columns: changed_columns}, pk_cols, :default) do
118-
Map.take(record, Enum.concat(pk_cols, changed_columns))
117+
defp put_update_values(%{record: record, changed_columns: changed_columns}, pk_cols, :default) do
118+
%{value: Map.take(record, Enum.concat(pk_cols, changed_columns))}
119+
end
120+
121+
defp put_update_values(
122+
%{record: record, old_record: old_record, changed_columns: changed_columns},
123+
_pk_cols,
124+
:full
125+
) do
126+
%{value: record, old_value: Map.take(old_record, MapSet.to_list(changed_columns))}
119127
end
120128

121-
defp update_values(%{record: record}, _pk_cols, :full) do
122-
record
129+
def merge_updates(u1, u2) when is_map_key(u1, "old_value") or is_map_key(u2, "old_value") do
130+
%{
131+
"key" => u1["key"],
132+
"headers" => Map.take(u1["headers"], ["operation", "relation"]),
133+
"value" => Map.merge(u1["value"], u2["value"]),
134+
# When merging old values, we give preference to the older u1
135+
"old_value" => Map.merge(u2["old_value"] || %{}, u1["old_value"] || %{})
136+
}
123137
end
124138

125139
def merge_updates(u1, u2) do

packages/sync-service/test/electric/log_item_test.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ defmodule Electric.LogItemsTest do
139139
{LogOffset.first(),
140140
%{
141141
value: %{"pk" => "10", "hello" => "world", "test" => "new"},
142+
old_value: %{"test" => "me"},
142143
key: "my_key",
143144
headers: %{
144145
relation: ["public", "test"],

packages/sync-service/test/electric/plug/router_test.exs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,57 @@ defmodule Electric.Plug.RouterTest do
618618
assert [_] = Jason.decode!(conn.resp_body)
619619
end
620620

621+
@tag with_sql: [
622+
"CREATE TABLE wide_table (id BIGINT PRIMARY KEY, value1 TEXT NOT NULL, value2 TEXT NOT NULL, value3 TEXT NOT NULL)",
623+
"INSERT INTO wide_table VALUES (1, 'test value 1', 'test value 1', 'test value 1')"
624+
]
625+
test "GET receives old rows in updates when replica=full", %{opts: opts, db_conn: db_conn} do
626+
conn =
627+
conn("GET", "/v1/shape?table=wide_table&offset=-1&replica=full") |> Router.call(opts)
628+
629+
assert %{status: 200} = conn
630+
shape_handle = get_resp_shape_handle(conn)
631+
json_body = Jason.decode!(conn.resp_body)
632+
633+
assert [
634+
%{
635+
"value" => %{"id" => _, "value1" => _, "value2" => _, "value3" => _},
636+
"key" => key
637+
}
638+
] = json_body
639+
640+
# Old value cannot be present on the snapshot
641+
refute match?(%{"old_value" => _}, json_body)
642+
643+
task =
644+
Task.async(fn ->
645+
conn(
646+
"GET",
647+
"/v1/shape?table=wide_table&offset=0_0&handle=#{shape_handle}&live&replica=full"
648+
)
649+
|> Router.call(opts)
650+
end)
651+
652+
Postgrex.query!(db_conn, "UPDATE wide_table SET value2 = 'test value 2' WHERE id = 1", [])
653+
654+
assert %{status: 200} = conn = Task.await(task)
655+
656+
# No extra keys should be present, so this is a pin
657+
value = %{
658+
"id" => "1",
659+
"value2" => "test value 2",
660+
"value1" => "test value 1",
661+
"value3" => "test value 1"
662+
}
663+
664+
old_value = %{
665+
"value2" => "test value 1"
666+
}
667+
668+
assert [%{"key" => ^key, "value" => ^value, "old_value" => ^old_value}, @up_to_date] =
669+
Jason.decode!(conn.resp_body)
670+
end
671+
621672
@tag additional_fields: "num INTEGER NOT NULL"
622673
@tag with_sql: [
623674
"INSERT INTO serial_ids (id, num) VALUES (1, 1), (2, 10)"

packages/sync-service/test/electric/shape_cache/file_storage/compaction_test.exs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,51 @@ defmodule Electric.ShapeCache.FileStorage.CompactionTest do
7878
|> length == 4
7979
end
8080

81+
test "compacts a log file with replica mode full", %{tmp_dir: tmp_dir} do
82+
log_file_path = Path.join(tmp_dir, "log_file")
83+
84+
log_stream =
85+
[
86+
ins(offset: {1, 1}, rec: [id: "key1", value: "value1"]),
87+
upd(offset: {2, 1}, rec: [id: "key1", value: {"value1", "value2"}]),
88+
ins(offset: {3, 1}, rec: [id: "key2", value: "value3"]),
89+
upd(offset: {4, 1}, rec: [id: "key1", value: {"value2", "value new 1"}]),
90+
upd(offset: {5, 1}, rec: [id: "key1", value: {"value new 1", "value new 2"}]),
91+
upd(offset: {6, 1}, rec: [id: "key1", value: {"value new 2", "value new 3"}]),
92+
upd(offset: {7, 1}, rec: [id: "key1", value: {"value new 3", "value new 4"}]),
93+
upd(offset: {8, 1}, rec: [id: "key1", value: {"value new 4", "value new 5"}]),
94+
del(offset: {9, 1}, rec: [id: "key2", value: "value"])
95+
]
96+
|> TestUtils.changes_to_log_items(replica: :full)
97+
98+
paths = LogFile.write_log_file(log_stream, log_file_path)
99+
100+
assert LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0})
101+
|> Enum.to_list()
102+
|> length == 9
103+
104+
assert {log_file_path, chunk_index_path, key_index_path} =
105+
Compaction.compact_in_place(paths, 1_000_000)
106+
107+
assert File.exists?(log_file_path)
108+
assert File.exists?(chunk_index_path)
109+
assert File.exists?(key_index_path)
110+
111+
assert [
112+
%{"headers" => %{"operation" => "insert"}},
113+
%{"headers" => %{"operation" => "insert"}},
114+
%{
115+
"headers" => %{"operation" => "update"},
116+
"value" => %{"id" => "key1", "value" => "value new 5"},
117+
"old_value" => %{"value" => "value1"}
118+
},
119+
%{"headers" => %{"operation" => "delete"}}
120+
] =
121+
LogFile.read_chunk(paths, %LogOffset{tx_offset: 0, op_offset: 0})
122+
|> Enum.to_list()
123+
|> Enum.map(&Jason.decode!/1)
124+
end
125+
81126
test "compacts a large enough log file full of updates (failing property)", %{
82127
tmp_dir: tmp_dir
83128
} do

packages/typescript-client/src/client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ export interface PostgresParams {
6969
* changed columns in an update.
7070
*
7171
* If it's `full` Electric will send the entire row with both changed and
72-
* unchanged values.
72+
* unchanged values. `old_value` will also be present on update messages,
73+
* containing the previous value for changed columns.
7374
*
7475
* Setting `replica` to `full` will result in higher bandwidth
7576
* usage and so is not generally recommended.

packages/typescript-client/src/parser.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,12 @@ export class MessageParser<T extends Row<unknown>> {
104104
// is needed because there could be a column named `value`
105105
// and the value associated to that column will be a string or null.
106106
// But `typeof null === 'object'` so we need to make an explicit check.
107-
if (key === `value` && typeof value === `object` && value !== null) {
107+
// We also parse the `old_value`, which appears on updates when `replica=full`.
108+
if (
109+
(key === `value` || key === `old_value`) &&
110+
typeof value === `object` &&
111+
value !== null
112+
) {
108113
// Parse the row values
109114
const row = value as Record<string, Value<GetExtensions<T>>>
110115
Object.keys(row).forEach((key) => {

packages/typescript-client/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export type ControlMessage = {
3232
export type ChangeMessage<T extends Row<unknown> = Row> = {
3333
key: string
3434
value: T
35+
old_value?: Partial<T> // Only provided for updates if `replica` is `full`
3536
headers: Header & { operation: Operation }
3637
}
3738

0 commit comments

Comments
 (0)