Skip to content

Commit c2b01c1

Browse files
authored
Fix LSN encoding in responses and handling in JS (#2446)
Follow-up to #2442 Randomizing the WAL start point in tests has highlighted problems with how we handle LSNs in Javascript. Javascript can only do max 53-bit number representations, after which it will start doing nonsense without telling you (gotta love it), but LSNs span the entire 64-bit range. In order to fix that, we encode the `lsn` and `global_last_seen_lsn` as strings in the responses rather than integers, so that Javascript can either compare them as strings or convert them to `BigInt`s, both would yield valid comparisons. JSON itself does not really care, and other languages are able to parse large integers correctly, but for the sake of portability in the current landscape I think it's better to keep it encoded only once and as a string, rather than e.g. twice as `lsn` and `lsn_str` - feels like overkill. I've kept the `op_position` as a number, because it would be really odd for a transaction to have `> 2^32` operations in it - maybe if the need arises or if we want consistency between `lsn` and `op_position` I can convert those as well. For the `MultiShapeStream`, I decided to parse the LSNs into `BigInt`s even though we don't really do any calculations with them and we could just compare them as strings, but I felt like this is more explicit, more easily understood while reading, and less error prone in the future, and the performance cost given that this is a primarily network-heavy utility I think is negligible, but I'm happy to change that if anyone feels strongly about this.
1 parent eb8167a commit c2b01c1

File tree

12 files changed

+125
-47
lines changed

12 files changed

+125
-47
lines changed

.changeset/wet-melons-cry.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@electric-sql/experimental": patch
3+
"@core/sync-service": patch
4+
---
5+
6+
Encode LSN as string in JSON responses for correct handling of large values (>53 bits) in Javascript.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
export function bigIntMax(...args: Array<bigint | number>): bigint {
2+
return BigInt(args.reduce((m, e) => (e > m ? e : m)))
3+
}
4+
5+
export function bigIntMin(...args: Array<bigint | number>): bigint {
6+
return BigInt(args.reduce((m, e) => (e < m ? e : m)))
7+
}
8+
9+
export function bigIntCompare(a: bigint, b: bigint): 1 | -1 | 0 {
10+
return a > b ? 1 : a < b ? -1 : 0
11+
}

packages/experimental/src/multi-shape-stream.ts

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { bigIntCompare, bigIntMax, bigIntMin } from './bigint-utils'
12
import {
23
ShapeStream,
34
isChangeMessage,
@@ -125,8 +126,8 @@ export class MultiShapeStream<
125126
// We keep track of the last lsn of data and up-to-date messages for each shape
126127
// so that we can skip checkForUpdates if the lsn of the up-to-date message is
127128
// greater than the last lsn of data.
128-
#lastDataLsns: { [K in keyof TShapeRows]: number }
129-
#lastUpToDateLsns: { [K in keyof TShapeRows]: number }
129+
#lastDataLsns: { [K in keyof TShapeRows]: bigint }
130+
#lastUpToDateLsns: { [K in keyof TShapeRows]: bigint }
130131

131132
readonly #subscribers = new Map<
132133
number,
@@ -155,11 +156,11 @@ export class MultiShapeStream<
155156
])
156157
) as { [K in keyof TShapeRows]: ShapeStream<TShapeRows[K]> }
157158
this.#lastDataLsns = Object.fromEntries(
158-
Object.entries(shapes).map(([key]) => [key, -Infinity])
159-
) as { [K in keyof TShapeRows]: number }
159+
Object.entries(shapes).map(([key]) => [key, BigInt(-1)])
160+
) as { [K in keyof TShapeRows]: bigint }
160161
this.#lastUpToDateLsns = Object.fromEntries(
161-
Object.entries(shapes).map(([key]) => [key, -Infinity])
162-
) as { [K in keyof TShapeRows]: number }
162+
Object.entries(shapes).map(([key]) => [key, BigInt(-1)])
163+
) as { [K in keyof TShapeRows]: bigint }
163164
if (start) this.#start()
164165
}
165166

@@ -176,9 +177,13 @@ export class MultiShapeStream<
176177
// Whats the max lsn of the up-to-date messages?
177178
const upToDateLsns = messages
178179
.filter(isControlMessage)
179-
.map(({ headers }) => (headers.global_last_seen_lsn as number) ?? 0)
180+
.map(({ headers }) =>
181+
typeof headers.global_last_seen_lsn === `string`
182+
? BigInt(headers.global_last_seen_lsn)
183+
: BigInt(0)
184+
)
180185
if (upToDateLsns.length > 0) {
181-
const maxUpToDateLsn = Math.max(...upToDateLsns)
186+
const maxUpToDateLsn = bigIntMax(...upToDateLsns)
182187
const lastMaxUpToDateLsn = this.#lastUpToDateLsns[key]
183188
if (maxUpToDateLsn > lastMaxUpToDateLsn) {
184189
this.#lastUpToDateLsns[key] = maxUpToDateLsn
@@ -188,9 +193,11 @@ export class MultiShapeStream<
188193
// Whats the max lsn of the data messages?
189194
const dataLsns = messages
190195
.filter(isChangeMessage)
191-
.map(({ headers }) => (headers.lsn as number) ?? 0)
196+
.map(({ headers }) =>
197+
typeof headers.lsn === `string` ? BigInt(headers.lsn) : BigInt(0)
198+
)
192199
if (dataLsns.length > 0) {
193-
const maxDataLsn = Math.max(...dataLsns)
200+
const maxDataLsn = bigIntMax(...dataLsns)
194201
const lastMaxDataLsn = this.#lastDataLsns[key]
195202
if (maxDataLsn > lastMaxDataLsn) {
196203
this.#lastDataLsns[key] = maxDataLsn
@@ -224,7 +231,7 @@ export class MultiShapeStream<
224231
}
225232

226233
async #checkForUpdates() {
227-
const maxDataLsn = Math.max(...Object.values(this.#lastDataLsns))
234+
const maxDataLsn = bigIntMax(...Object.values(this.#lastDataLsns))
228235
const refreshPromises = this.#shapeEntries()
229236
.filter(([key]) => {
230237
// We only need to refresh shapes that have not seen an up-to-date message
@@ -374,20 +381,20 @@ export class TransactionalMultiShapeStream<
374381
[K: string]: Row<unknown>
375382
},
376383
> extends MultiShapeStream<TShapeRows> {
377-
#changeMessages = new Map<number, MultiShapeMessage<Row<unknown>, string>[]>()
384+
#changeMessages = new Map<bigint, MultiShapeMessage<Row<unknown>, string>[]>()
378385
#completeLsns: {
379-
[K in keyof TShapeRows]: number
386+
[K in keyof TShapeRows]: bigint
380387
}
381388

382389
constructor(options: MultiShapeStreamOptions<TShapeRows>) {
383390
super(options)
384391
this.#completeLsns = Object.fromEntries(
385-
Object.entries(options.shapes).map(([key]) => [key, -Infinity])
386-
) as { [K in keyof TShapeRows]: number }
392+
Object.entries(options.shapes).map(([key]) => [key, BigInt(-1)])
393+
) as { [K in keyof TShapeRows]: bigint }
387394
}
388395

389396
#getLowestCompleteLsn() {
390-
return Math.min(...Object.values(this.#completeLsns))
397+
return bigIntMin(...Object.values(this.#completeLsns))
391398
}
392399

393400
protected async _publish(
@@ -399,7 +406,7 @@ export class TransactionalMultiShapeStream<
399406
(lsn) => lsn <= lowestCompleteLsn
400407
)
401408
const messagesToPublish = lsnsToPublish
402-
.sort((a, b) => a - b)
409+
.sort((a, b) => bigIntCompare(a, b))
403410
.map((lsn) =>
404411
this.#changeMessages.get(lsn)?.sort((a, b) => {
405412
const { headers: aHeaders } = a
@@ -429,7 +436,8 @@ export class TransactionalMultiShapeStream<
429436
const { shape, headers } = message
430437
if (isChangeMessage(message)) {
431438
// The snapshot message does not have an lsn, so we use 0
432-
const lsn = typeof headers.lsn === `number` ? headers.lsn : 0
439+
const lsn =
440+
typeof headers.lsn === `string` ? BigInt(headers.lsn) : BigInt(0)
433441
if (!this.#changeMessages.has(lsn)) {
434442
this.#changeMessages.set(lsn, [])
435443
}
@@ -439,16 +447,16 @@ export class TransactionalMultiShapeStream<
439447
typeof headers.last === `boolean` &&
440448
headers.last === true
441449
) {
442-
this.#completeLsns[shape] = Math.max(this.#completeLsns[shape], lsn)
450+
this.#completeLsns[shape] = bigIntMax(this.#completeLsns[shape], lsn)
443451
}
444452
} else if (isControlMessage(message)) {
445453
if (headers.control === `up-to-date`) {
446-
if (typeof headers.global_last_seen_lsn !== `number`) {
454+
if (typeof headers.global_last_seen_lsn !== `string`) {
447455
throw new Error(`global_last_seen_lsn is not a number`)
448456
}
449-
this.#completeLsns[shape] = Math.max(
457+
this.#completeLsns[shape] = bigIntMax(
450458
this.#completeLsns[shape],
451-
headers.global_last_seen_lsn
459+
BigInt(headers.global_last_seen_lsn)
452460
)
453461
}
454462
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { describe, expect, it } from 'vitest'
2+
import { bigIntCompare, bigIntMax, bigIntMin } from '../src/bigint-utils'
3+
describe(`bigIntMax`, () => {
4+
it(`should return the maximum of bigint and number arguments as a bigint`, () => {
5+
expect(bigIntMax(BigInt(1), BigInt(2), BigInt(3))).toBe(BigInt(3))
6+
expect(bigIntMax(5, 10, 2)).toBe(BigInt(10))
7+
expect(bigIntMax(BigInt(1), 2, BigInt(3), 4)).toBe(BigInt(4))
8+
})
9+
10+
it(`should return the only element as a bigint when there is one argument`, () => {
11+
expect(bigIntMax(BigInt(42))).toBe(BigInt(42))
12+
expect(bigIntMax(99)).toBe(BigInt(99))
13+
})
14+
15+
it(`should handle negative numbers and bigints`, () => {
16+
expect(bigIntMax(BigInt(-10), BigInt(-5), BigInt(-1))).toBe(BigInt(-1))
17+
expect(bigIntMax(-100, -50, -10)).toBe(BigInt(-10))
18+
})
19+
})
20+
21+
describe(`bigIntMin`, () => {
22+
it(`should return the minimum of bigint and number arguments as a bigint`, () => {
23+
expect(bigIntMin(BigInt(1), BigInt(2), BigInt(3))).toBe(BigInt(1))
24+
expect(bigIntMin(5, 10, 2)).toBe(BigInt(2))
25+
expect(bigIntMin(BigInt(1), 2, BigInt(3), 4)).toBe(BigInt(1))
26+
})
27+
28+
it(`should return the only element as a bigint when there is one argument`, () => {
29+
expect(bigIntMin(BigInt(42))).toBe(BigInt(42))
30+
expect(bigIntMin(99)).toBe(BigInt(99))
31+
})
32+
33+
it(`should handle negative numbers and bigints`, () => {
34+
expect(bigIntMin(BigInt(-10), BigInt(-5), BigInt(-1))).toBe(BigInt(-10))
35+
expect(bigIntMin(-100, -50, -10)).toBe(BigInt(-100))
36+
})
37+
})
38+
39+
describe(`bigIntCompare`, () => {
40+
it(`should return 1 when the first bigint is greater than the second`, () => {
41+
expect(bigIntCompare(BigInt(5), BigInt(3))).toBe(1)
42+
expect(bigIntCompare(BigInt(100), BigInt(99))).toBe(1)
43+
})
44+
45+
it(`should return -1 when the first bigint is less than the second`, () => {
46+
expect(bigIntCompare(BigInt(3), BigInt(5))).toBe(-1)
47+
expect(bigIntCompare(BigInt(99), BigInt(100))).toBe(-1)
48+
})
49+
50+
it(`should return 0 when both bigints are equal`, () => {
51+
expect(bigIntCompare(BigInt(42), BigInt(42))).toBe(0)
52+
expect(bigIntCompare(BigInt(0), BigInt(0))).toBe(0)
53+
})
54+
})

packages/experimental/test/multi-shape-stream.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,6 @@ describe(`TransactionalMultiShapeStream`, () => {
326326
multiShapeStream.subscribe((msgs: MultiShapeMessages<ShapeConfig>[]) => {
327327
messageGroups.push(msgs)
328328
if (multiShapeStream.isUpToDate) {
329-
console.log(`multiShapeStream.isUpToDate`)
330329
resolve()
331330
}
332331
})

packages/sync-service/lib/electric/log_items.ex

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ defmodule Electric.LogItems do
3535
operation: :insert,
3636
txids: List.wrap(txids),
3737
relation: Tuple.to_list(change.relation),
38-
lsn: change.log_offset.tx_offset,
38+
lsn: to_string(change.log_offset.tx_offset),
3939
op_position: change.log_offset.op_offset
4040
}
4141
}}
@@ -52,7 +52,7 @@ defmodule Electric.LogItems do
5252
operation: :delete,
5353
txids: List.wrap(txids),
5454
relation: Tuple.to_list(change.relation),
55-
lsn: change.log_offset.tx_offset,
55+
lsn: to_string(change.log_offset.tx_offset),
5656
op_position: change.log_offset.op_offset
5757
}
5858
}}
@@ -69,7 +69,7 @@ defmodule Electric.LogItems do
6969
operation: :update,
7070
txids: List.wrap(txids),
7171
relation: Tuple.to_list(change.relation),
72-
lsn: change.log_offset.tx_offset,
72+
lsn: to_string(change.log_offset.tx_offset),
7373
op_position: change.log_offset.op_offset
7474
}
7575
}
@@ -90,7 +90,7 @@ defmodule Electric.LogItems do
9090
txids: List.wrap(txids),
9191
relation: Tuple.to_list(change.relation),
9292
key_change_to: change.key,
93-
lsn: change.log_offset.tx_offset,
93+
lsn: to_string(change.log_offset.tx_offset),
9494
op_position: change.log_offset.op_offset
9595
}
9696
}},
@@ -103,7 +103,7 @@ defmodule Electric.LogItems do
103103
txids: List.wrap(txids),
104104
relation: Tuple.to_list(change.relation),
105105
key_change_from: change.old_key,
106-
lsn: new_offset.tx_offset,
106+
lsn: to_string(new_offset.tx_offset),
107107
op_position: new_offset.op_offset
108108
}
109109
}}

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ defmodule Electric.Shapes.Api do
588588
end
589589

590590
defp up_to_date_ctl(up_to_date_lsn) do
591-
%{headers: %{control: "up-to-date", global_last_seen_lsn: up_to_date_lsn}}
591+
%{headers: %{control: "up-to-date", global_last_seen_lsn: to_string(up_to_date_lsn)}}
592592
end
593593

594594
defp with_span(%Request{} = request, name, attributes \\ [], fun) do

packages/sync-service/test/electric/log_item_test.exs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ defmodule Electric.LogItemsTest do
2323
relation: ["public", "test"],
2424
operation: :insert,
2525
txids: [1],
26-
lsn: 0,
26+
lsn: "0",
2727
op_position: 0
2828
}
2929
}}
@@ -40,7 +40,7 @@ defmodule Electric.LogItemsTest do
4040
relation: ["public", "test"],
4141
operation: :insert,
4242
txids: [1],
43-
lsn: 0,
43+
lsn: "0",
4444
op_position: 0
4545
}
4646
}}
@@ -65,7 +65,7 @@ defmodule Electric.LogItemsTest do
6565
relation: ["public", "test"],
6666
operation: :delete,
6767
txids: [1],
68-
lsn: 0,
68+
lsn: "0",
6969
op_position: 0
7070
}
7171
}}
@@ -90,7 +90,7 @@ defmodule Electric.LogItemsTest do
9090
relation: ["public", "test"],
9191
operation: :delete,
9292
txids: [1],
93-
lsn: 0,
93+
lsn: "0",
9494
op_position: 0
9595
}
9696
}}
@@ -117,7 +117,7 @@ defmodule Electric.LogItemsTest do
117117
relation: ["public", "test"],
118118
operation: :update,
119119
txids: [1],
120-
lsn: 0,
120+
lsn: "0",
121121
op_position: 0
122122
}
123123
}}
@@ -145,7 +145,7 @@ defmodule Electric.LogItemsTest do
145145
relation: ["public", "test"],
146146
operation: :update,
147147
txids: [1],
148-
lsn: 0,
148+
lsn: "0",
149149
op_position: 0
150150
}
151151
}}
@@ -171,7 +171,7 @@ defmodule Electric.LogItemsTest do
171171
relation: ["public", "test"],
172172
operation: :delete,
173173
txids: [1],
174-
lsn: 0,
174+
lsn: "0",
175175
op_position: 0
176176
}
177177
}}
@@ -200,7 +200,7 @@ defmodule Electric.LogItemsTest do
200200
operation: :delete,
201201
txids: [1],
202202
key_change_to: "new_key",
203-
lsn: 0,
203+
lsn: "0",
204204
op_position: 0
205205
}
206206
}},
@@ -213,7 +213,7 @@ defmodule Electric.LogItemsTest do
213213
operation: :insert,
214214
txids: [1],
215215
key_change_from: "old_key",
216-
lsn: 0,
216+
lsn: "0",
217217
op_position: 1
218218
}
219219
}}
@@ -242,7 +242,7 @@ defmodule Electric.LogItemsTest do
242242
operation: :delete,
243243
txids: [1],
244244
key_change_to: "new_key",
245-
lsn: 0,
245+
lsn: "0",
246246
op_position: 0
247247
}
248248
}},
@@ -255,7 +255,7 @@ defmodule Electric.LogItemsTest do
255255
operation: :insert,
256256
txids: [1],
257257
key_change_from: "old_key",
258-
lsn: 0,
258+
lsn: "0",
259259
op_position: 1
260260
}
261261
}}

packages/sync-service/test/electric/plug/router_test.exs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -801,8 +801,8 @@ defmodule Electric.Plug.RouterTest do
801801

802802
# Verify that both ops share the same tx offset and differ in their op offset by a known
803803
# amount.
804-
op1_log_offset = LogOffset.new(op1_lsn, op1_op_position)
805-
op2_log_offset = LogOffset.new(op2_lsn, op2_op_position)
804+
op1_log_offset = LogOffset.new(String.to_integer(op1_lsn), op1_op_position)
805+
op2_log_offset = LogOffset.new(String.to_integer(op2_lsn), op2_op_position)
806806

807807
assert op2_log_offset == last_log_offset
808808

@@ -888,8 +888,8 @@ defmodule Electric.Plug.RouterTest do
888888

889889
# Verify that both ops share the same tx offset and differ in their op offset by a known
890890
# amount.
891-
op1_log_offset = LogOffset.new(op1_lsn, op1_op_position)
892-
op2_log_offset = LogOffset.new(op2_lsn, op2_op_position)
891+
op1_log_offset = LogOffset.new(String.to_integer(op1_lsn), op1_op_position)
892+
op2_log_offset = LogOffset.new(String.to_integer(op2_lsn), op2_op_position)
893893

894894
assert op2_log_offset == last_log_offset
895895

0 commit comments

Comments
 (0)