Skip to content

Commit df690e9

Browse files
KyleAMathewsclaude
andcommitted
test(client): add error-path-publish static analysis rule and fix flaky SSE test
Add new static analysis rule detecting #publish/#onMessages calls inside catch blocks or HTTP error status handlers — catches the Bug #4 pattern (publishing stale 409 data to subscribers). RED/GREEN verified. Also: update SPEC.md loop-back site line numbers, fix 409 handler comment, DRY improvements to model-based tests, fix flaky SSE fallback test (guard controller.close() against already-closed stream, widen SSE request count tolerance). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fe3c84f commit df690e9

File tree

6 files changed

+300
-94
lines changed

6 files changed

+300
-94
lines changed

packages/typescript-client/SPEC.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -360,12 +360,12 @@ Six sites in `client.ts` recurse or loop to issue a new fetch:
360360

361361
| # | Site | Line | Trigger | URL changes because | Guard |
362362
| --- | --------------------------------------- | ---- | ---------------------------------------------------------- | ------------------------------------------------------------------------------- | ------------------------------------------------------ |
363-
| L1 | `#requestShape``#requestShape` | 940 | Normal completion after `#fetchShape()` | Offset advances from response headers | `#checkFastLoop` (non-live) |
364-
| L2 | `#requestShape` catch → `#requestShape` | 874 | Abort with `FORCE_DISCONNECT_AND_REFRESH` or `SYSTEM_WAKE` | `isRefreshing` flag changes `canLongPoll`, affecting `live` param | Abort signals are discrete events |
365-
| L3 | `#requestShape` catch → `#requestShape` | 886 | `StaleCacheError` thrown by `#onInitialResponse` | `StaleRetryState` adds `cache_buster` param | `maxStaleCacheRetries` counter in state machine |
366-
| L4 | `#requestShape` catch → `#requestShape` | 924 | HTTP 409 (shape rotation) | `#reset()` sets offset=-1 + new handle; unconditional cache buster on every 409 | New handle + unique retry URL via cache buster |
367-
| L5 | `#start` catch → `#start` | 782 | Exception + `onError` returns retry opts | Params/headers merged from `retryOpts` | User-controlled; `#checkFastLoop` on next iteration |
368-
| L6 | `fetchSnapshot` catch → `fetchSnapshot` | 1975 | HTTP 409 on snapshot fetch | New handle via `withHandle()`; unconditional cache buster on every 409 | `#maxSnapshotRetries` (5) + unconditional cache buster |
363+
| L1 | `#requestShape``#requestShape` | 939 | Normal completion after `#fetchShape()` | Offset advances from response headers | `#checkFastLoop` (non-live) |
364+
| L2 | `#requestShape` catch → `#requestShape` | 883 | Abort with `FORCE_DISCONNECT_AND_REFRESH` or `SYSTEM_WAKE` | `isRefreshing` flag changes `canLongPoll`, affecting `live` param | Abort signals are discrete events |
365+
| L3 | `#requestShape` catch → `#requestShape` | 895 | `StaleCacheError` thrown by `#onInitialResponse` | `StaleRetryState` adds `cache_buster` param | `maxStaleCacheRetries` counter in state machine |
366+
| L4 | `#requestShape` catch → `#requestShape` | 923 | HTTP 409 (shape rotation) | `#reset()` sets offset=-1 + new handle; unconditional cache buster on every 409 | New handle + unique retry URL via cache buster |
367+
| L5 | `#start` catch → `#start` | 775 | Exception + `onError` returns retry opts | Params/headers merged from `retryOpts` | User-controlled; `#checkFastLoop` on next iteration |
368+
| L6 | `fetchSnapshot` catch → `fetchSnapshot` | 1937 | HTTP 409 on snapshot fetch | New handle via `withHandle()`; unconditional cache buster on every 409 | `#maxSnapshotRetries` (5) + unconditional cache buster |
369369

370370
### Guard mechanisms
371371

packages/typescript-client/bin/lib/shape-stream-static-analysis.mjs

Lines changed: 183 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export function analyzeTypeScriptClient(options = {}) {
119119
unboundedRetryReport: clientAnalysis.unboundedRetryReport,
120120
cacheBusterReport: clientAnalysis.cacheBusterReport,
121121
tailPositionAwaitReport: clientAnalysis.tailPositionAwaitReport,
122+
errorPathPublishReport: clientAnalysis.errorPathPublishReport,
122123
ignoredActionReport: stateMachineAnalysis.ignoredActionReport,
123124
protocolLiteralReport: protocolLiteralAnalysis.report,
124125
},
@@ -150,6 +151,10 @@ export function analyzeShapeStreamClient(filePath = CLIENT_FILE) {
150151
classDecl,
151152
recursiveMethods
152153
)
154+
const errorPathPublishReport = buildErrorPathPublishReport(
155+
sourceFile,
156+
classDecl
157+
)
153158
const findings = sharedFieldReport
154159
.filter((report) => report.risky)
155160
.map((report) => ({
@@ -223,7 +228,13 @@ export function analyzeShapeStreamClient(filePath = CLIENT_FILE) {
223228
locations: [
224229
{ file: filePath, line: entry.statusCheckLine, label: `409 check` },
225230
...(entry.cacheBusterLine
226-
? [{ file: filePath, line: entry.cacheBusterLine, label: `conditional cache buster` }]
231+
? [
232+
{
233+
file: filePath,
234+
line: entry.cacheBusterLine,
235+
label: `conditional cache buster`,
236+
},
237+
]
227238
: []),
228239
{ file: filePath, line: entry.retryLine, label: `retry call` },
229240
],
@@ -249,6 +260,27 @@ export function analyzeShapeStreamClient(filePath = CLIENT_FILE) {
249260
details: entry,
250261
}))
251262
)
263+
.concat(
264+
errorPathPublishReport
265+
.filter((entry) => entry.isInErrorPath)
266+
.map((entry) => ({
267+
kind: `error-path-publish`,
268+
severity: `warning`,
269+
title: `Subscriber-facing call in error handler: ${entry.callee} in ${entry.method}`,
270+
message:
271+
`${entry.method} calls ${entry.callee} at line ${entry.callLine} inside a ` +
272+
`${entry.context} block (line ${entry.contextLine}). Publishing messages to subscribers ` +
273+
`from error/retry paths can deliver stale or partial data. Error handlers should ` +
274+
`clean up and retry, not notify subscribers.`,
275+
file: filePath,
276+
line: entry.callLine,
277+
locations: [
278+
{ file: filePath, line: entry.contextLine, label: entry.context },
279+
{ file: filePath, line: entry.callLine, label: `subscriber call` },
280+
],
281+
details: entry,
282+
}))
283+
)
252284

253285
return {
254286
sourceFile,
@@ -258,6 +290,7 @@ export function analyzeShapeStreamClient(filePath = CLIENT_FILE) {
258290
unboundedRetryReport,
259291
cacheBusterReport,
260292
tailPositionAwaitReport,
293+
errorPathPublishReport,
261294
findings,
262295
}
263296
}
@@ -515,6 +548,16 @@ export function formatAnalysisResult(result, options = {}) {
515548
)
516549
}
517550

551+
lines.push(``)
552+
lines.push(`Error-Path Publish Report:`)
553+
for (const report of result.reports.errorPathPublishReport) {
554+
const flag = report.isInErrorPath ? `!` : `-`
555+
lines.push(
556+
` ${flag} ${report.method} -> ${report.callee} ` +
557+
`(${report.context}:${report.contextLine} call:${report.callLine})`
558+
)
559+
}
560+
518561
lines.push(``)
519562
lines.push(`409 Cache Buster Report:`)
520563
for (const report of result.reports.cacheBusterReport) {
@@ -789,7 +832,8 @@ function buildUnboundedRetryReport(sourceFile, classDecl, recursiveMethods) {
789832
const recursiveNames = new Set(recursiveMethods.map((m) => m.name))
790833

791834
for (const member of classDecl.members) {
792-
if (!ts.isMethodDeclaration(member) || !member.body || !member.name) continue
835+
if (!ts.isMethodDeclaration(member) || !member.body || !member.name)
836+
continue
793837
const methodName = formatMemberName(member.name)
794838
if (!recursiveNames.has(methodName)) continue
795839

@@ -831,7 +875,8 @@ function build409CacheBusterReport(sourceFile, classDecl) {
831875
const report = []
832876

833877
for (const member of classDecl.members) {
834-
if (!ts.isMethodDeclaration(member) || !member.body || !member.name) continue
878+
if (!ts.isMethodDeclaration(member) || !member.body || !member.name)
879+
continue
835880
const methodName = formatMemberName(member.name)
836881

837882
walk(member.body, (node) => {
@@ -880,7 +925,8 @@ function build409CacheBusterReport(sourceFile, classDecl) {
880925
statusCheckLine,
881926
retryCallee: lastRetry.callee,
882927
retryLine: lastRetry.line,
883-
cacheBusterLine: cacheBusterCalls.length > 0 ? cacheBusterCalls[0].line : null,
928+
cacheBusterLine:
929+
cacheBusterCalls.length > 0 ? cacheBusterCalls[0].line : null,
884930
unconditional: hasUnconditional,
885931
})
886932
})
@@ -904,7 +950,8 @@ function buildTailPositionAwaitReport(sourceFile, classDecl, recursiveMethods) {
904950
const recursiveNames = new Set(recursiveMethods.map((m) => m.name))
905951

906952
for (const member of classDecl.members) {
907-
if (!ts.isMethodDeclaration(member) || !member.body || !member.name) continue
953+
if (!ts.isMethodDeclaration(member) || !member.body || !member.name)
954+
continue
908955
const methodName = formatMemberName(member.name)
909956
if (!recursiveNames.has(methodName)) continue
910957

@@ -930,9 +977,7 @@ function buildTailPositionAwaitReport(sourceFile, classDecl, recursiveMethods) {
930977

931978
const next = block.statements[stmtIndex + 1]
932979
const followedByBareReturn =
933-
next &&
934-
ts.isReturnStatement(next) &&
935-
!next.expression
980+
next && ts.isReturnStatement(next) && !next.expression
936981

937982
if (!followedByBareReturn) return
938983

@@ -954,6 +999,124 @@ function buildTailPositionAwaitReport(sourceFile, classDecl, recursiveMethods) {
954999
return report.sort(compareReports)
9551000
}
9561001

1002+
/**
1003+
* Detects calls to subscriber-facing methods (#publish, #onMessages) inside
1004+
* error handling paths (catch blocks, status-check if-branches that throw/retry).
1005+
* Publishing messages from error handlers can deliver stale or partial data to
1006+
* subscribers — the fix for Bug #4 removed exactly this pattern from the 409 handler.
1007+
*/
1008+
const SUBSCRIBER_METHODS = new Set([`#publish`, `#onMessages`])
1009+
1010+
function buildErrorPathPublishReport(sourceFile, classDecl) {
1011+
const report = []
1012+
1013+
for (const member of classDecl.members) {
1014+
if (!ts.isMethodDeclaration(member) || !member.body || !member.name)
1015+
continue
1016+
const methodName = formatMemberName(member.name)
1017+
1018+
// Check catch blocks
1019+
walk(member.body, (node) => {
1020+
if (ts.isCatchClause(node)) {
1021+
walk(node.block, (inner) => {
1022+
if (!ts.isCallExpression(inner)) return
1023+
const callee = getThisMemberName(inner.expression)
1024+
if (!callee || !SUBSCRIBER_METHODS.has(callee)) return
1025+
1026+
report.push({
1027+
method: methodName,
1028+
callee,
1029+
callLine: getLine(sourceFile, inner),
1030+
context: `catch`,
1031+
contextLine: getLine(sourceFile, node),
1032+
isInErrorPath: true,
1033+
})
1034+
})
1035+
return
1036+
}
1037+
1038+
// Check 4xx/5xx status-check if-blocks (e.g., if (e.status == 409))
1039+
if (ts.isIfStatement(node) && isHttpErrorStatusCheck(node.expression)) {
1040+
walk(node.thenStatement, (inner) => {
1041+
if (!ts.isCallExpression(inner)) return
1042+
const callee = getThisMemberName(inner.expression)
1043+
if (!callee || !SUBSCRIBER_METHODS.has(callee)) return
1044+
1045+
report.push({
1046+
method: methodName,
1047+
callee,
1048+
callLine: getLine(sourceFile, inner),
1049+
context: `status-${getStatusLiteral(node.expression)}`,
1050+
contextLine: getLine(sourceFile, node),
1051+
isInErrorPath: true,
1052+
})
1053+
})
1054+
}
1055+
})
1056+
}
1057+
1058+
return report.sort(compareReports)
1059+
}
1060+
1061+
/**
1062+
* Returns true if the expression checks an HTTP error status (4xx or 5xx).
1063+
* Recurses into && and || expressions.
1064+
*/
1065+
function isHttpErrorStatusCheck(expression) {
1066+
if (!ts.isBinaryExpression(expression)) return false
1067+
1068+
const op = expression.operatorToken.kind
1069+
if (
1070+
op === ts.SyntaxKind.EqualsEqualsToken ||
1071+
op === ts.SyntaxKind.EqualsEqualsEqualsToken
1072+
) {
1073+
return (
1074+
(isHttpErrorLiteral(expression.left) &&
1075+
isStatusAccess(expression.right)) ||
1076+
(isHttpErrorLiteral(expression.right) && isStatusAccess(expression.left))
1077+
)
1078+
}
1079+
1080+
if (
1081+
op === ts.SyntaxKind.AmpersandAmpersandToken ||
1082+
op === ts.SyntaxKind.BarBarToken
1083+
) {
1084+
return (
1085+
isHttpErrorStatusCheck(expression.left) ||
1086+
isHttpErrorStatusCheck(expression.right)
1087+
)
1088+
}
1089+
1090+
return false
1091+
}
1092+
1093+
function isHttpErrorLiteral(node) {
1094+
if (!ts.isNumericLiteral(node)) return false
1095+
const status = Number(node.text)
1096+
return status >= 400 && status < 600
1097+
}
1098+
1099+
function getStatusLiteral(expression) {
1100+
if (!ts.isBinaryExpression(expression)) return `unknown`
1101+
const op = expression.operatorToken.kind
1102+
if (
1103+
op === ts.SyntaxKind.EqualsEqualsToken ||
1104+
op === ts.SyntaxKind.EqualsEqualsEqualsToken
1105+
) {
1106+
if (ts.isNumericLiteral(expression.left)) return expression.left.text
1107+
if (ts.isNumericLiteral(expression.right)) return expression.right.text
1108+
}
1109+
if (
1110+
op === ts.SyntaxKind.AmpersandAmpersandToken ||
1111+
op === ts.SyntaxKind.BarBarToken
1112+
) {
1113+
const left = getStatusLiteral(expression.left)
1114+
if (left !== `unknown`) return left
1115+
return getStatusLiteral(expression.right)
1116+
}
1117+
return `unknown`
1118+
}
1119+
9571120
/**
9581121
* Returns true if the node is inside the try clause (not catch/finally) of a
9591122
* TryStatement that is a descendant of the given boundary node.
@@ -993,7 +1156,9 @@ function is409StatusCheck(expression) {
9931156
op === ts.SyntaxKind.AmpersandAmpersandToken ||
9941157
op === ts.SyntaxKind.BarBarToken
9951158
) {
996-
return is409StatusCheck(expression.left) || is409StatusCheck(expression.right)
1159+
return (
1160+
is409StatusCheck(expression.left) || is409StatusCheck(expression.right)
1161+
)
9971162
}
9981163

9991164
return false
@@ -1029,7 +1194,10 @@ function isInsideIfBlock(node, boundary) {
10291194

10301195
function classifyRetryBound(sourceFile, catchClause, callNode) {
10311196
const callLine = getLine(sourceFile, callNode)
1032-
const enclosingConditions = collectEnclosingIfConditions(catchClause, callNode)
1197+
const enclosingConditions = collectEnclosingIfConditions(
1198+
catchClause,
1199+
callNode
1200+
)
10331201

10341202
if (findPriorCounterGuard(sourceFile, catchClause.block, callLine)) {
10351203
return `counter`
@@ -1060,7 +1228,11 @@ function collectEnclosingIfConditions(catchClause, callNode) {
10601228

10611229
while (current && current !== catchClause) {
10621230
const parent = current.parent
1063-
if (parent && ts.isIfStatement(parent) && current === parent.thenStatement) {
1231+
if (
1232+
parent &&
1233+
ts.isIfStatement(parent) &&
1234+
current === parent.thenStatement
1235+
) {
10641236
conditions.push(parent.expression)
10651237
}
10661238
current = current.parent

packages/typescript-client/src/client.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -899,9 +899,9 @@ export class ShapeStream<T extends Row<unknown> = Row>
899899

900900
if (e.status == 409) {
901901
// Upon receiving a 409, start from scratch with the newly
902-
// provided shape handle. If the header is missing (e.g. proxy
903-
// stripped it), reset without a handle and use a random
904-
// cache-buster query param to ensure the retry URL is unique.
902+
// provided shape handle (if present). An unconditional cache
903+
// buster ensures the retry URL is always unique regardless of
904+
// whether the server returns a new, same, or missing handle.
905905

906906
// Store the current shape URL as expired to avoid future 409s
907907
if (this.#syncState.handle) {

packages/typescript-client/test/client.test.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2895,16 +2895,22 @@ it(
28952895
if (isSSE && (!initialHandle || reqHandle === initialHandle)) {
28962896
initialHandle ??= reqHandle!
28972897
sseRequestCount++
2898-
// Handle up to 4 SSE requests (we expect 3, but might see 4 due to timing)
2899-
if (sseRequestCount <= 4) {
2898+
// Handle up to 5 SSE requests (we expect 3, but might see more due to timing)
2899+
if (sseRequestCount <= 5) {
29002900
// Simulate SSE connections that close immediately by returning
29012901
// an empty stream that closes right away (simulates cached/misconfigured response)
29022902
const stream = new ReadableStream({
29032903
start(controller) {
29042904
// Close after a tiny delay to let onopen callback complete
29052905
// This simulates a connection that establishes but closes immediately
29062906
// (e.g., due to cached response or proxy misconfiguration)
2907-
setTimeout(() => controller.close(), 10)
2907+
setTimeout(() => {
2908+
try {
2909+
controller.close()
2910+
} catch {
2911+
// Stream may already be closed if aborted
2912+
}
2913+
}, 10)
29082914
},
29092915
})
29102916

@@ -2981,10 +2987,11 @@ it(
29812987
return urlObj.searchParams.get(`live_sse`) === `true`
29822988
})
29832989

2984-
// After fallback, should see 3-4 SSE requests (3 short ones trigger fallback,
2985-
// but there might be one more in flight due to async timing)
2990+
// After fallback, should see 3-5 SSE requests (3 short ones trigger fallback,
2991+
// but there may be more in flight due to async timing between the
2992+
// fallback decision and requests already dispatched)
29862993
expect(allSseRequests.length).toBeGreaterThanOrEqual(3)
2987-
expect(allSseRequests.length).toBeLessThanOrEqual(4)
2994+
expect(allSseRequests.length).toBeLessThanOrEqual(5)
29882995

29892996
unsubscribe()
29902997
} finally {

0 commit comments

Comments
 (0)