Skip to content

Commit b6b1328

Browse files
author
npub1mprnacetjua2xx3p5eddmhxyk6wv929ymm5py8kd2xfxurxahspqqlgyta
committed
fix(desktop): page reconnect replay windows
Co-authored-by: npub1mprnacetjua2xx3p5eddmhxyk6wv929ymm5py8kd2xfxurxahspqqlgyta <d8473ee32b973aa31a21a65adddcc4b69cc2a8a4dee8121ecd51926e0cddbc02@sprout-oss.stage.blox.sqprod.co> Signed-off-by: npub1mprnacetjua2xx3p5eddmhxyk6wv929ymm5py8kd2xfxurxahspqqlgyta <d8473ee32b973aa31a21a65adddcc4b69cc2a8a4dee8121ecd51926e0cddbc02@sprout-oss.stage.blox.sqprod.co>
1 parent 25001eb commit b6b1328

3 files changed

Lines changed: 319 additions & 39 deletions

File tree

desktop/src/shared/api/relayClientSession.ts

Lines changed: 17 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
type RelaySubscription,
2222
type RelaySubscriptionFilter,
2323
} from "@/shared/api/relayClientShared";
24+
import { replayLiveSubscriptions } from "@/shared/api/relayReconnectReplay";
2425
import { RelayConnectionStateEmitter } from "@/shared/api/relayConnectionStateEmitter";
2526
import {
2627
shouldRefuseConnect,
@@ -31,7 +32,6 @@ import { buildThreadReferenceTags } from "@/features/messages/lib/threading";
3132

3233
const RECONNECT_BASE_DELAY_MS = 1_000,
3334
RECONNECT_MAX_DELAY_MS = 30_000,
34-
RECONNECT_REPLAY_SKEW_SECS = 5,
3535
EVENT_BATCH_MS = 16;
3636

3737
/**
@@ -165,7 +165,10 @@ export class RelayClient {
165165

166166
private async fetchHistory(filter: RelaySubscriptionFilter) {
167167
await this.ensureConnected();
168+
return this.requestHistory(filter);
169+
}
168170

171+
private requestHistory(filter: RelaySubscriptionFilter) {
169172
return new Promise<RelayEvent[]>((resolve, reject) => {
170173
const subId = `history-${crypto.randomUUID()}`;
171174
const timeout = window.setTimeout(() => {
@@ -860,45 +863,20 @@ export class RelayClient {
860863
return false;
861864
}
862865

863-
private buildReplayFilter(filter: RelaySubscriptionFilter, since?: number) {
864-
if (since === undefined) {
865-
return filter;
866-
}
867-
868-
return {
869-
...filter,
870-
since: filter.since === undefined ? since : Math.max(filter.since, since),
871-
};
872-
}
873-
874866
private async replayLiveSubscriptions() {
875-
for (const [subId, subscription] of this.subscriptions) {
876-
if (subscription.mode !== "live") {
877-
continue;
878-
}
879-
880-
const replaySince =
881-
subscription.lastSeenCreatedAt === undefined
882-
? undefined
883-
: Math.max(
884-
0,
885-
subscription.lastSeenCreatedAt - RECONNECT_REPLAY_SKEW_SECS,
886-
);
887-
888-
try {
889-
await this.sendRaw([
890-
"REQ",
891-
subId,
892-
this.buildReplayFilter(subscription.filter, replaySince),
893-
]);
894-
} catch (error) {
895-
const reconnectError =
896-
error instanceof Error
897-
? error
898-
: new Error("Failed to restore relay subscriptions.");
899-
this.resetConnection(reconnectError);
900-
throw reconnectError;
901-
}
867+
try {
868+
await replayLiveSubscriptions({
869+
subscriptions: this.subscriptions,
870+
sendRaw: (payload) => this.sendRaw(payload),
871+
requestHistory: (filter) => this.requestHistory(filter),
872+
});
873+
} catch (error) {
874+
const reconnectError =
875+
error instanceof Error
876+
? error
877+
: new Error("Failed to restore relay subscriptions.");
878+
this.resetConnection(reconnectError);
879+
throw reconnectError;
902880
}
903881
}
904882

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import assert from "node:assert/strict";
2+
import test from "node:test";
3+
4+
import {
5+
buildReconnectReplayFilter,
6+
replayLiveSubscriptions,
7+
} from "./relayReconnectReplay.ts";
8+
import { RelayClient } from "./relayClientSession.ts";
9+
10+
function replayFilter(filter, since, until) {
11+
return buildReconnectReplayFilter(filter, since, until);
12+
}
13+
14+
function event(id, createdAt) {
15+
return {
16+
id,
17+
pubkey: "pubkey",
18+
created_at: createdAt,
19+
kind: 9,
20+
tags: [],
21+
content: "",
22+
sig: "sig",
23+
};
24+
}
25+
26+
function eventRange(prefix, start, count) {
27+
return Array.from({ length: count }, (_, index) =>
28+
event(`${prefix}-${index}`, start + index),
29+
);
30+
}
31+
32+
test("reconnect replay preserves small steady-state limits when adding since", () => {
33+
const filter = {
34+
kinds: [9, 40002],
35+
"#h": ["channel-1"],
36+
limit: 50,
37+
};
38+
39+
assert.deepEqual(replayFilter(filter, 123), {
40+
kinds: [9, 40002],
41+
"#h": ["channel-1"],
42+
limit: 50,
43+
since: 123,
44+
});
45+
});
46+
47+
test("reconnect replay caps large steady-state limits", () => {
48+
const filter = {
49+
kinds: [9],
50+
"#h": ["channel-1"],
51+
limit: 1000,
52+
};
53+
54+
assert.deepEqual(replayFilter(filter, 123), {
55+
kinds: [9],
56+
"#h": ["channel-1"],
57+
limit: 500,
58+
since: 123,
59+
});
60+
});
61+
62+
test("reconnect replay keeps the stricter existing since window", () => {
63+
const filter = {
64+
kinds: [9],
65+
"#h": ["channel-1"],
66+
limit: 50,
67+
since: 200,
68+
};
69+
70+
assert.deepEqual(replayFilter(filter, 123), {
71+
kinds: [9],
72+
"#h": ["channel-1"],
73+
limit: 50,
74+
since: 200,
75+
});
76+
});
77+
78+
test("reconnect replay applies the stricter until window", () => {
79+
const filter = {
80+
kinds: [9],
81+
"#h": ["channel-1"],
82+
limit: 50,
83+
until: 300,
84+
};
85+
86+
assert.deepEqual(replayFilter(filter, 123, 400), {
87+
kinds: [9],
88+
"#h": ["channel-1"],
89+
limit: 50,
90+
since: 123,
91+
until: 300,
92+
});
93+
});
94+
95+
test("initial subscription replay preserves the original filter", () => {
96+
const filter = {
97+
kinds: [9],
98+
"#h": ["channel-1"],
99+
limit: 50,
100+
};
101+
102+
assert.equal(replayFilter(filter, undefined), filter);
103+
});
104+
105+
test("channel reconnect replay pages the missed window until a short page", async () => {
106+
const delivered = [];
107+
const historyFilters = [];
108+
const sentPayloads = [];
109+
const pages = [
110+
eventRange("newest", 1501, 500),
111+
eventRange("middle", 1002, 500),
112+
eventRange("oldest", 995, 8),
113+
];
114+
const client = new RelayClient();
115+
const filter = client.buildChannelFilter("channel-1", 50);
116+
const subscriptions = new Map([
117+
[
118+
"live-1",
119+
{
120+
mode: "live",
121+
filter,
122+
onEvent: (event) => delivered.push(event),
123+
lastSeenCreatedAt: 1000,
124+
},
125+
],
126+
]);
127+
128+
await replayLiveSubscriptions({
129+
subscriptions,
130+
now: 2000,
131+
sendRaw: async (payload) => {
132+
sentPayloads.push(payload);
133+
},
134+
requestHistory: async (filter) => {
135+
historyFilters.push(filter);
136+
return pages.shift() ?? [];
137+
},
138+
});
139+
140+
assert.deepEqual(sentPayloads, [
141+
[
142+
"REQ",
143+
"live-1",
144+
{
145+
kinds: filter.kinds,
146+
"#h": ["channel-1"],
147+
limit: 50,
148+
since: 1995,
149+
},
150+
],
151+
]);
152+
assert.deepEqual(historyFilters, [
153+
{
154+
kinds: filter.kinds,
155+
"#h": ["channel-1"],
156+
limit: 500,
157+
since: 995,
158+
until: 2000,
159+
},
160+
{
161+
kinds: filter.kinds,
162+
"#h": ["channel-1"],
163+
limit: 500,
164+
since: 995,
165+
until: 1501,
166+
},
167+
{
168+
kinds: filter.kinds,
169+
"#h": ["channel-1"],
170+
limit: 500,
171+
since: 995,
172+
until: 1002,
173+
},
174+
]);
175+
assert.equal(delivered.length, 1008);
176+
});
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import { CHANNEL_EVENT_KINDS } from "@/shared/constants/kinds";
2+
import type {
3+
RelaySubscription,
4+
RelaySubscriptionFilter,
5+
} from "@/shared/api/relayClientShared";
6+
import type { RelayEvent } from "@/shared/api/types";
7+
8+
const RECONNECT_REPLAY_SKEW_SECS = 5;
9+
export const RECONNECT_REPLAY_PAGE_LIMIT = 500;
10+
11+
export function buildReconnectReplayFilter(
12+
filter: RelaySubscriptionFilter,
13+
since?: number,
14+
until?: number,
15+
limit = Math.min(filter.limit, RECONNECT_REPLAY_PAGE_LIMIT),
16+
) {
17+
if (since === undefined) return filter;
18+
19+
const replayFilter: RelaySubscriptionFilter = {
20+
...filter,
21+
limit,
22+
since: filter.since === undefined ? since : Math.max(filter.since, since),
23+
};
24+
25+
if (until !== undefined) {
26+
replayFilter.until =
27+
filter.until === undefined ? until : Math.min(filter.until, until);
28+
}
29+
30+
return replayFilter;
31+
}
32+
33+
export function shouldPageReconnectReplay(filter: RelaySubscriptionFilter) {
34+
return (
35+
filter.limit > 0 &&
36+
Array.isArray(filter["#h"]) &&
37+
filter["#h"].length === 1 &&
38+
CHANNEL_EVENT_KINDS.every((kind) => filter.kinds.includes(kind))
39+
);
40+
}
41+
42+
export async function replayReconnectHistoryPages({
43+
subscription,
44+
since,
45+
until,
46+
isActive,
47+
requestHistory,
48+
}: {
49+
subscription: Extract<RelaySubscription, { mode: "live" }>;
50+
since: number;
51+
until: number;
52+
isActive: () => boolean;
53+
requestHistory: (filter: RelaySubscriptionFilter) => Promise<RelayEvent[]>;
54+
}) {
55+
let pageUntil = until;
56+
57+
while (pageUntil >= since) {
58+
if (!isActive()) return;
59+
60+
const events = await requestHistory(
61+
buildReconnectReplayFilter(
62+
subscription.filter,
63+
since,
64+
pageUntil,
65+
RECONNECT_REPLAY_PAGE_LIMIT,
66+
),
67+
);
68+
69+
if (!isActive()) return;
70+
71+
for (const event of events) subscription.onEvent(event);
72+
if (events.length < RECONNECT_REPLAY_PAGE_LIMIT) return;
73+
74+
const oldestCreatedAt = events[0]?.created_at;
75+
if (oldestCreatedAt === undefined || oldestCreatedAt <= since) return;
76+
77+
pageUntil =
78+
oldestCreatedAt < pageUntil ? oldestCreatedAt : oldestCreatedAt - 1;
79+
}
80+
}
81+
82+
export async function replayLiveSubscriptions({
83+
subscriptions,
84+
sendRaw,
85+
requestHistory,
86+
now = Math.floor(Date.now() / 1_000),
87+
}: {
88+
subscriptions: Map<string, RelaySubscription>;
89+
sendRaw: (payload: unknown[]) => Promise<void>;
90+
requestHistory: (filter: RelaySubscriptionFilter) => Promise<RelayEvent[]>;
91+
now?: number;
92+
}) {
93+
for (const [subId, subscription] of subscriptions) {
94+
if (subscription.mode !== "live") continue;
95+
96+
const replaySince =
97+
subscription.lastSeenCreatedAt === undefined
98+
? undefined
99+
: Math.max(
100+
0,
101+
subscription.lastSeenCreatedAt - RECONNECT_REPLAY_SKEW_SECS,
102+
);
103+
const shouldPageReplay =
104+
replaySince !== undefined &&
105+
shouldPageReconnectReplay(subscription.filter);
106+
const liveReplaySince = shouldPageReplay
107+
? Math.max(0, now - RECONNECT_REPLAY_SKEW_SECS)
108+
: replaySince;
109+
110+
await sendRaw([
111+
"REQ",
112+
subId,
113+
buildReconnectReplayFilter(subscription.filter, liveReplaySince),
114+
]);
115+
116+
if (shouldPageReplay) {
117+
await replayReconnectHistoryPages({
118+
subscription,
119+
since: replaySince,
120+
until: now,
121+
isActive: () => subscriptions.get(subId) === subscription,
122+
requestHistory,
123+
});
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)