-
Notifications
You must be signed in to change notification settings - Fork 6
fix abort controller handling #576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,12 +4,12 @@ import { wrapResult } from "./registry/utils.js"; | |
| import { | ||
| type Config, | ||
| type AutoWaitConfig, | ||
| type ChainName, | ||
| type PollingController, | ||
| type Signer, | ||
| checkWait, | ||
| extractBaseUrl, | ||
| type ChainName, | ||
| getBaseUrl, | ||
| type Signal, | ||
| type Signer, | ||
| normalize, | ||
| validateTableName, | ||
| } from "./helpers/index.js"; | ||
|
|
@@ -83,7 +83,7 @@ export class Database<D = unknown> { | |
| * in the sequence fails, then an error is returned for that specific | ||
| * statement, and it aborts or rolls back the entire sequence. | ||
| * @param statements A set of Statement objects to batch and submit. | ||
| * @param opts Additional options to control execution. | ||
| * @param controller An optional object used to control receipt polling behavior. | ||
| * @returns An array of run results. | ||
| */ | ||
| // Note: if we want this package to mirror the D1 package in a way that | ||
|
|
@@ -92,8 +92,8 @@ export class Database<D = unknown> { | |
| // D1-ORM is a good example: https://github.com/Interactions-as-a-Service/d1-orm/ | ||
| async batch<T = D>( | ||
| statements: Statement[], | ||
| opts: Signal = {} | ||
| // reads returns an Array with legnth equal to the number of batched statements, | ||
| controller?: PollingController | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couple things here:
|
||
| // reads returns an Array with length equal to the number of batched statements, | ||
| // everything else a single result wrapped in an Array for backward compatability. | ||
| ): Promise<Array<Result<T>>> { | ||
| try { | ||
|
|
@@ -124,7 +124,9 @@ export class Database<D = unknown> { | |
| // and return an Array of the query results. | ||
| if (type === "read") { | ||
| return await Promise.all( | ||
| statements.map(async (stmt) => await stmt.all<T>(undefined, opts)) | ||
| statements.map( | ||
| async (stmt) => await stmt.all<T>(undefined, controller) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing this won't work... the timeout would be cancelled after the first result is returned. Sound right? This might need some special handling... ie, create a new controller for each statement (clone of |
||
| ) | ||
| ); | ||
| } | ||
|
|
||
|
|
@@ -135,7 +137,8 @@ export class Database<D = unknown> { | |
| await execCreateMany( | ||
| this.config, | ||
| statements.map((stmt) => stmt.toString()) | ||
| ) | ||
| ), | ||
| controller | ||
| ); | ||
|
|
||
| // TODO: wrapping in an Array is required for back compat, consider changing this for next major | ||
|
|
@@ -160,7 +163,8 @@ export class Database<D = unknown> { | |
|
|
||
| const receipt = await checkWait( | ||
| this.config, | ||
| await execMutateMany(this.config, runnables) | ||
| await execMutateMany(this.config, runnables), | ||
| controller | ||
| ); | ||
|
|
||
| // TODO: wrapping in an Array is required for back compat, consider changing this for next major | ||
|
|
@@ -186,19 +190,19 @@ export class Database<D = unknown> { | |
| * transaction. In the future, more "intelligent" transaction planning, | ||
| * splitting, and batching may be used. | ||
| * @param statementStrings A set of SQL statement strings separated by semi-colons. | ||
| * @param opts Additional options to control execution. | ||
| * @param controller An optional object used to control receipt polling behavior. | ||
| * @returns A single run result. | ||
| */ | ||
| async exec<T = D>( | ||
| statementStrings: string, | ||
| opts: Signal = {} | ||
| controller?: PollingController | ||
| ): Promise<Result<T>> { | ||
| // TODO: Note that this method appears to be the wrong return type in practice. | ||
| try { | ||
| const { statements } = await normalize(statementStrings); | ||
| const count = statements.length; | ||
| const statement = this.prepare(statementStrings); | ||
| const result = await statement.run(opts); | ||
| const result = await statement.run(controller); | ||
| // Adds a count property which isn't typed | ||
| result.meta.count = count; | ||
| return result; | ||
|
|
@@ -213,9 +217,9 @@ export class Database<D = unknown> { | |
| /** | ||
| * Export a (set of) tables to the SQLite binary format. | ||
| * Not implemented yet! | ||
| * @param _opts Additional options to control execution. | ||
| * @param controller An optional object used to control receipt polling behavior. | ||
| */ | ||
| async dump(_opts: Signal = {}): Promise<ArrayBuffer> { | ||
| async dump(_controller?: PollingController): Promise<ArrayBuffer> { | ||
| throw errorWithCause("DUMP_ERROR", new Error("not implemented yet")); | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,19 @@ | ||
| export type Awaitable<T> = T | PromiseLike<T>; | ||
|
|
||
| export interface Signal { | ||
| signal?: AbortSignal; | ||
| signal: AbortSignal; | ||
| abort: () => void; | ||
| } | ||
|
|
||
| export interface Interval { | ||
| interval?: number; | ||
| interval: number; | ||
| cancel: () => void; | ||
| } | ||
|
|
||
| export type SignalAndInterval = Signal & Interval; | ||
| export type PollingController = Signal & Interval; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changing this to a name that more closely describes what it does. I had to dig into the source code to figure this out when I first saw this. |
||
|
|
||
| export interface Wait<T = unknown> { | ||
| wait: (opts?: SignalAndInterval) => Promise<T>; | ||
| wait: (controller?: PollingController) => Promise<T>; | ||
| } | ||
|
|
||
| export interface AsyncData<T> { | ||
|
|
@@ -21,36 +23,41 @@ export interface AsyncData<T> { | |
|
|
||
| export type AsyncFunction<T> = () => Awaitable<AsyncData<T>>; | ||
|
|
||
| export function getAbortSignal( | ||
| signal?: AbortSignal, | ||
| maxTimeout: number = 60_000 | ||
| ): { | ||
| signal: AbortSignal; | ||
| timeoutId: ReturnType<typeof setTimeout> | undefined; | ||
| } { | ||
| let abortSignal: AbortSignal; | ||
| let timeoutId; | ||
| if (signal == null) { | ||
| const controller = new AbortController(); | ||
| abortSignal = controller.signal; | ||
| // return the timeoutId so the caller can cleanup | ||
| timeoutId = setTimeout(function () { | ||
| export function createSignal(): Signal { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Helper method for use with read-only APIs |
||
| const controller = new AbortController(); | ||
| return { | ||
| signal: controller.signal, | ||
| abort: () => { | ||
| controller.abort(); | ||
| }, maxTimeout); | ||
| } else { | ||
| abortSignal = signal; | ||
| } | ||
| return { signal: abortSignal, timeoutId }; | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| export function createPollingController( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Helper method for mutating APIs. |
||
| timeout: number = 60_000, | ||
| pollingInterval: number = 1500 | ||
| ): PollingController { | ||
| const controller = new AbortController(); | ||
| const timeoutId = setTimeout(function () { | ||
| controller.abort(); | ||
| }, timeout); | ||
| return { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This object doubles as an options object for fetch and as a controller for the user to call |
||
| signal: controller.signal, | ||
| abort: () => { | ||
| controller.abort(); | ||
| }, | ||
| interval: pollingInterval, | ||
| cancel: () => { | ||
| clearTimeout(timeoutId); | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| export async function getAsyncPoller<T = unknown>( | ||
| fn: AsyncFunction<T>, | ||
| interval: number = 1500, | ||
| signal?: AbortSignal | ||
| controller?: PollingController | ||
| ): Promise<T> { | ||
| // in order to set a timeout other than 10 seconds you need to | ||
| // create and pass in an abort signal with a different timeout | ||
| const { signal: abortSignal, timeoutId } = getAbortSignal(signal, 10_000); | ||
| const control = controller ?? createPollingController(); | ||
| const checkCondition = ( | ||
| resolve: (value: T) => void, | ||
| reject: (reason?: any) => void | ||
|
|
@@ -59,15 +66,15 @@ export async function getAsyncPoller<T = unknown>( | |
| .then((result: AsyncData<T>) => { | ||
| if (result.done && result.data != null) { | ||
| // We don't want to call `AbortController.abort()` if the call succeeded | ||
| clearTimeout(timeoutId); | ||
| control.cancel(); | ||
| return resolve(result.data); | ||
| } | ||
| if (abortSignal.aborted) { | ||
| if (control.signal.aborted) { | ||
| // We don't want to call `AbortController.abort()` if the call is already aborted | ||
| clearTimeout(timeoutId); | ||
| return reject(abortSignal.reason); | ||
| control.cancel(); | ||
| return reject(control.signal.reason); | ||
| } else { | ||
| setTimeout(checkCondition, interval, resolve, reject); | ||
| setTimeout(checkCondition, control.interval, resolve, reject); | ||
| } | ||
| }) | ||
| .catch((err) => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| import { type WaitableTransactionReceipt } from "../registry/utils.js"; | ||
| import { type PollingController } from "./await.js"; | ||
| import { type ChainName, getBaseUrl } from "./chains.js"; | ||
| import { type Signer, type ExternalProvider, getSigner } from "./ethers.js"; | ||
|
|
||
|
|
@@ -26,10 +27,11 @@ export interface AliasesNameMap { | |
|
|
||
| export async function checkWait( | ||
| config: Config & Partial<AutoWaitConfig>, | ||
| receipt: WaitableTransactionReceipt | ||
| receipt: WaitableTransactionReceipt, | ||
| controller?: PollingController | ||
| ): Promise<WaitableTransactionReceipt> { | ||
| if (config.autoWait ?? false) { | ||
| const waited = await receipt.wait(); | ||
| const waited = await receipt.wait(controller); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the actual fix for the polling handling. |
||
| return { ...receipt, ...waited }; | ||
| } | ||
| return receipt; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -252,10 +252,10 @@ function catchNotFound(err: unknown): [] { | |
| export async function queryRaw<T = unknown>( | ||
| config: ReadConfig, | ||
| statement: string, | ||
| opts: Signal = {} | ||
| signal?: Signal | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed. As mentioned above for controller, this isn't really an "options" object.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We left it as options in case we wanted to add additional opts to it at a later date.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. So you would have unioned
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we could have, but naw it is probably fine. Worst case is that we have to change the parameter name in the future. But since it is already an object, it'll be a pretty safe change. |
||
| ): Promise<Array<ValueOf<T>>> { | ||
| const params = { statement, format: "table" } as const; | ||
| const response = await getQuery<T>(config, params, opts) | ||
| const response = await getQuery<T>(config, params, signal) | ||
| .then((res) => res.rows) | ||
| .catch(catchNotFound); | ||
| return response; | ||
|
|
@@ -264,19 +264,21 @@ export async function queryRaw<T = unknown>( | |
| export async function queryAll<T = unknown>( | ||
| config: ReadConfig, | ||
| statement: string, | ||
| opts: Signal = {} | ||
| signal?: Signal | ||
| ): Promise<ObjectsFormat<T>> { | ||
| const params = { statement, format: "objects" } as const; | ||
| const response = await getQuery<T>(config, params, opts).catch(catchNotFound); | ||
| const response = await getQuery<T>(config, params, signal).catch( | ||
| catchNotFound | ||
| ); | ||
| return response; | ||
| } | ||
|
|
||
| export async function queryFirst<T = unknown>( | ||
| config: ReadConfig, | ||
| statement: string, | ||
| opts: Signal = {} | ||
| signal?: Signal | ||
| ): Promise<T | null> { | ||
| const response = await queryAll<T>(config, statement, opts).catch( | ||
| const response = await queryAll<T>(config, statement, signal).catch( | ||
| catchNotFound | ||
| ); | ||
| return response.shift() ?? null; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a good view of the API. Use a helper method to create a "controller" that is passed to the
.all,.run, etc. methods. The user may use this controller to abort the request via itsabortmethod.