Skip to content

Commit b51377a

Browse files
committed
feat(lds): make wal2json parameters configurable
* add `LdsOptions.params` * fix `numeric-data-types-as-string` to be set only if `types` are passed * adjust the `parse` function accordingly, to handle `number` values without a type parser * set `include-type-oids` only if `types` are passed * adjust `changeToRecord`/`changeToPk` to call `parse` only if type oids are present
1 parent 6f19c6f commit b51377a

File tree

1 file changed

+47
-27
lines changed

1 file changed

+47
-27
lines changed

packages/lds/src/pg-logical-decoding.ts

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ declare module "pg" {
1616
*/
1717
interface Keys {
1818
keynames: Array<string>;
19-
keytypes: Array<string>;
20-
keytypeoids: Array<number>;
19+
keytypes?: Array<string>; // with `include-types` option (default true)
20+
keytypeoids?: Array<number>; // with `include-type-oids` option (default false)
2121
keyvalues: Array<any>;
2222
}
2323

@@ -35,8 +35,8 @@ export interface InsertChange extends Change {
3535

3636
// https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L969
3737
columnnames: Array<string>;
38-
columntypes: Array<string>; // with `include-types` option (default true)
39-
columntypeoids: Array<number>; // with `include-type-oids` option (default false)
38+
columntypes?: Array<string>; // with `include-types` option (default true)
39+
columntypeoids?: Array<number>; // with `include-type-oids` option (default false)
4040
columnvalues: Array<any>;
4141
}
4242

@@ -45,8 +45,8 @@ export interface UpdateChange extends Change {
4545

4646
// https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L973
4747
columnnames: Array<string>;
48-
columntypes: Array<string>; // with `include-types` option (default true)
49-
columntypeoids: Array<number>; // with `include-type-oids` option (default false)
48+
columntypes?: Array<string>; // with `include-types` option (default true)
49+
columntypeoids?: Array<number>; // with `include-type-oids` option (default false)
5050
columnvalues: Array<any>;
5151

5252
// https://github.com/eulerto/wal2json/blob/f81bf7af09324da656be87dfd53d20741c01e1e0/wal2json.c#L992-L1003
@@ -60,8 +60,6 @@ export interface DeleteChange extends Change {
6060
oldkeys: Keys;
6161
}
6262

63-
const id = <T>(value: T): T => value;
64-
6563
interface Payload {
6664
lsn: string;
6765
data: {
@@ -83,44 +81,58 @@ export interface LdsOptions {
8381
temporary?: boolean;
8482
/** (Custom) [type parsers](https://node-postgres.com/features/queries#types) to deserialise the wal2json column string values. Pass `pg.types` to get the default type parsing. Defaults to `undefined`, that is raw values will get emitted. */
8583
types?: pg.CustomTypesConfig;
84+
/** Extra [parameters to be passed to wal2json](https://github.com/eulerto/wal2json?tab=readme-ov-file#parameters). Use e.g. `{'numeric-data-types-as-string', 't'}` to make the type parsers apply to numeric values. */
85+
params?: Partial<Record<string, string>>;
8686
}
8787

8888
export default class PgLogicalDecoding extends EventEmitter {
8989
public readonly slotName: string;
9090
public readonly temporary: boolean;
91-
private connectionString: string;
92-
private tablePattern: string;
91+
private readonly getChangesQueryText: string;
92+
private readonly parse: (value: any, typeOid: number) => any;
9393
private pool: pg.Pool | null;
9494
private client: Promise<pg.PoolClient> | null;
95-
private readonly parse: (value: any, typeOid: number) => any;
9695

9796
constructor(connectionString: string, options?: LdsOptions) {
9897
super();
99-
this.connectionString = connectionString;
10098
const {
10199
tablePattern = "*.*",
102100
slotName = "postgraphile",
103101
temporary = false,
104102
types,
103+
params,
105104
} = options || {};
106-
this.tablePattern = tablePattern;
107105
this.slotName = slotName;
108106
this.temporary = temporary;
107+
const parametersSql = Object.entries({
108+
"add-tables": tablePattern != "*.*" ? tablePattern : null,
109+
"include-types": "f", // type names are unnecessary
110+
"include-type-oids": types ? "t" : null,
111+
"numeric-data-types-as-string": types ? "t" : null,
112+
...params,
113+
})
114+
.flatMap(entry => (typeof entry[1] == "string" ? entry : []))
115+
.map(pg.Client.prototype.escapeLiteral)
116+
.join(", ");
117+
this.getChangesQueryText = `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, ${parametersSql})`;
109118
this.parse = types
110119
? (value: any, typeOid: number) => {
111120
if (value === null) return null;
112-
// wal2json always outputs `bool`s as boolean, not as string, irregardless of `numeric-data-types-as-string`.
113-
if (typeOid === pg.types.builtins.BOOL) return value;
121+
// wal2json always outputs `bool`s as boolean
122+
if (typeof value === "boolean") return value; // assert: typeOid === pg.types.builtins.BOOL
123+
// wal2json outputs numeric data as numbers, unless `numeric-data-types-as-string` is set
124+
if (typeof value === "number") return value;
114125
const parser = types.getTypeParser(typeOid, "text");
115126
return parser(value);
116127
}
117-
: id;
128+
: (value, _) => value;
118129
// We just use the pool to get better error handling
119130
this.pool = new pg.Pool({
120-
connectionString: this.connectionString,
131+
connectionString,
121132
max: 1,
122133
});
123134
this.pool.on("error", this.onPoolError);
135+
this.client = null;
124136
}
125137

126138
public async dropStaleSlots() {
@@ -181,9 +193,9 @@ export default class PgLogicalDecoding extends EventEmitter {
181193
const client = await this.getClient();
182194
await this.trackSelf(client);
183195
try {
184-
const { rows } = await client.query({
185-
text: `SELECT lsn, data FROM pg_catalog.pg_logical_slot_get_changes($1, $2, $3, 'add-tables', $4::text, 'include-types', 'f', 'include-type-oids', 't', 'numeric-data-types-as-string', 't')`,
186-
values: [this.slotName, uptoLsn, uptoNchanges, this.tablePattern],
196+
const { rows } = await client.query<[lsn: string, data: string]>({
197+
text: this.getChangesQueryText,
198+
values: [this.slotName, uptoLsn, uptoNchanges],
187199
rowMode: "array",
188200
});
189201
return rows.map(toLsnData);
@@ -205,17 +217,25 @@ export default class PgLogicalDecoding extends EventEmitter {
205217
change: InsertChange | UpdateChange
206218
): Record<string, any> {
207219
const { columnnames, columnvalues, columntypeoids } = change;
208-
return columnnames.reduce<Record<string, any>>((memo, name, i) => {
209-
memo[name] = this.parse(columnvalues[i], columntypeoids[i]);
210-
return memo;
211-
}, {});
220+
return columnnames.reduce<Record<string, any>>(
221+
columntypeoids
222+
? (memo, name, i) => {
223+
memo[name] = this.parse(columnvalues[i], columntypeoids[i]);
224+
return memo;
225+
}
226+
: (memo, name, i) => {
227+
memo[name] = columnvalues[i];
228+
return memo;
229+
},
230+
{}
231+
);
212232
}
213233

214234
public changeToPk(change: UpdateChange | DeleteChange): any[] {
215235
const { keyvalues, keytypeoids } = change.oldkeys;
216-
return this.parse == id
217-
? keyvalues
218-
: keyvalues.map((value, i) => this.parse(value, keytypeoids[i]));
236+
return keytypeoids
237+
? keyvalues.map((value, i) => this.parse(value, keytypeoids[i]))
238+
: keyvalues;
219239
}
220240

221241
public async close() {

0 commit comments

Comments
 (0)