Skip to content

Commit 56c338a

Browse files
authored
fix: Catch and surface body consumption errors in regular flow (#2500)
Closes #2497 - Wrap fetch such that we ensure that we can not only complete the request but also consume the fetch body in full - and any errors arising from that are still surfaced as `FetchError` - Always throw errors from our fetch even if the status code is not in the `>= 400 && < 500` range - if we don't, it fails in the next step as `response` is undefined. In this way, errors during the consumption of the body stream are handled the same as other network errors, interrupting the stream, notifying subscribers, etc.
1 parent 85b863a commit 56c338a

File tree

6 files changed

+83
-9
lines changed

6 files changed

+83
-9
lines changed

.changeset/forty-chefs-cheer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@electric-sql/client": patch
3+
---
4+
5+
Surface errors from consuming response body as `FetchError`s in regular handling flow

packages/sync-service/.env.dev

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ OTHER_DATABASE_URL=postgresql://postgres:password@localhost:54322/electric?sslmo
1010
ELECTRIC_PROFILE_WHERE_CLAUSES=false
1111
ELECTRIC_OTEL_SAMPLING_RATIO=1
1212
ELECTRIC_OTEL_DEBUG=false
13+
ElECTIRC_INSECURE=true

packages/typescript-client/src/client.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
BackoffOptions,
2222
createFetchWithBackoff,
2323
createFetchWithChunkBuffer,
24+
createFetchWithConsumedMessages,
2425
createFetchWithResponseHeadersCheck,
2526
} from './fetch'
2627
import {
@@ -368,8 +369,10 @@ export class ShapeStream<T extends Row<unknown> = Row>
368369
},
369370
})
370371

371-
this.#fetchClient = createFetchWithResponseHeadersCheck(
372-
createFetchWithChunkBuffer(fetchWithBackoffClient)
372+
this.#fetchClient = createFetchWithConsumedMessages(
373+
createFetchWithResponseHeadersCheck(
374+
createFetchWithChunkBuffer(fetchWithBackoffClient)
375+
)
373376
)
374377
}
375378

@@ -510,12 +513,13 @@ export class ShapeStream<T extends Row<unknown> = Row>
510513
this.#reset(newShapeHandle)
511514
await this.#publish(e.json as Message<T>[])
512515
continue
513-
} else if (e.status >= 400 && e.status < 500) {
516+
} else {
514517
// Notify subscribers
515518
this.#sendErrorToSubscribers(e)
516519

517-
// 400 errors are not actionable without additional user input,
518-
// so we exit the loop
520+
// errors that have reached this point are not actionable without
521+
// additional user input, such as 400s or failures to read the
522+
// body of a response, so we exit the loop
519523
throw e
520524
}
521525
} finally {

packages/typescript-client/src/error.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ export class FetchError extends Error {
3333
let json: object | undefined = undefined
3434

3535
const contentType = response.headers.get(`content-type`)
36-
if (contentType && contentType.includes(`application/json`)) {
37-
json = (await response.json()) as object
38-
} else {
39-
text = await response.text()
36+
if (!response.bodyUsed) {
37+
if (contentType && contentType.includes(`application/json`)) {
38+
json = (await response.json()) as object
39+
} else {
40+
text = await response.text()
41+
}
4042
}
4143

4244
return new FetchError(status, text, json, headers, url)

packages/typescript-client/src/fetch.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,32 @@ export function createFetchWithBackoff(
9595
}
9696
}
9797

98+
// Ensure body can actually be read in its entirety
99+
export function createFetchWithConsumedMessages(fetchClient: typeof fetch) {
100+
return async (...args: Parameters<typeof fetch>): Promise<Response> => {
101+
const url = args[0]
102+
const res = await fetchClient(...args)
103+
try {
104+
if (res.body === null) return res
105+
const text = await res.text()
106+
return new Response(text, res)
107+
} catch (err) {
108+
throw new FetchError(
109+
res.status,
110+
undefined,
111+
undefined,
112+
Object.fromEntries([...res.headers.entries()]),
113+
url.toString(),
114+
err instanceof Error
115+
? err.message
116+
: typeof err === `string`
117+
? err
118+
: `failed to read body`
119+
)
120+
}
121+
}
122+
}
123+
98124
interface ChunkPrefetchOptions {
99125
maxChunksToPrefetch: number
100126
}

packages/typescript-client/test/integration.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,42 @@ describe(`HTTP Sync`, () => {
866866
})
867867
})
868868

869+
it(`should handle invalid requests by terminating stream`, async ({
870+
expect,
871+
issuesTableUrl,
872+
aborter,
873+
}) => {
874+
let error: Error
875+
const invalidIssueStream = new ShapeStream<IssueRow>({
876+
url: `${BASE_URL}/v1/shape`,
877+
params: {
878+
table: issuesTableUrl,
879+
where: `1=1`,
880+
},
881+
signal: aborter.signal,
882+
// handle: streamState.handle,
883+
onError: (err) => {
884+
error = err
885+
},
886+
fetchClient: async (...args) => {
887+
const res = await fetch(...args)
888+
await res.text()
889+
return res
890+
},
891+
})
892+
893+
const errorSubscriberPromise = new Promise((_, reject) =>
894+
invalidIssueStream.subscribe(() => {}, reject)
895+
)
896+
897+
await expect(errorSubscriberPromise).rejects.toThrow(FetchError)
898+
expect(invalidIssueStream.error).instanceOf(FetchError)
899+
expect(invalidIssueStream.isConnected()).false
900+
expect(error!.message).contains(
901+
`Body is unusable: Body has already been read`
902+
)
903+
})
904+
869905
it(`should detect shape deprecation and restart syncing`, async ({
870906
expect,
871907
insertIssues,

0 commit comments

Comments
 (0)