Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 115 additions & 102 deletions core/backend/src/ChangesetReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/** @packageDocumentation
* @module ECDb
*/
import { DbChangeStage, DbOpcode, Id64String, IModelStatus } from "@itwin/core-bentley";
import { DbOpcode, Id64String, IModelStatus } from "@itwin/core-bentley";
import { IModelError } from "@itwin/core-common";
import { IModelDb } from "./IModelDb";
import { IModelNative } from "./internal/NativePlatform";
Expand Down Expand Up @@ -41,63 +41,110 @@ export class ChangesetReader implements Disposable, ChangeSource {
private _rowOptions?: RowFormatOptions;
private _propFilter: PropertyFilter = PropertyFilter.All;
private _changeIndex = 0;
private _op?: SqliteChangeOp;
private _isECTable?: boolean;
private _tableName?: string;
private _isIndirectChange?: boolean;

/** Rows fetched in the most recent native batch call. */
private _cache: IModelJsNative.ChangesetRowData[] = [];
/**
* Index of the current row in `_cache`.
* Equals `_cache.length` (i.e. out-of-bounds) when no row is active:
* initial state, after exhaustion, or after close().
*/
private _cacheIndex = 0;
/** The db used for EC schema resolution. */
public readonly db: AnyDb;

/** Returns the active cached row, throwing if no row is current.
* @internal */
private get _currentRow(): IModelJsNative.ChangesetRowData {
if (this._cacheIndex >= this._cache.length)
throw new IModelError(IModelStatus.BadRequest, "ChangesetReader: no current row — call step() first.");
return this._cache[this._cacheIndex];
}

/** Returns the batch size to use for native step() calls based on the active property filter.
* @internal */
private get _batchSize(): number {
switch (this._propFilter) {
Comment thread
soham-bentley marked this conversation as resolved.
Outdated
case PropertyFilter.InstanceKey: return 50;
case PropertyFilter.BisCoreElement: return 25;
default: return 10;
}
}

/**
* `true` when the current row belongs to an EC-mapped table.
* Valid only after a successful call to [[step]].
* @throws [[IModelError]] if called before a successful [[step]] call.
* @beta
*/
public get isECTable(): boolean {
if (this._isECTable === undefined)
throw new IModelError(IModelStatus.BadRequest, "ChangesetReader.isECTable is only valid after step() has positioned the reader on a current valid change.");
return this._isECTable;
}
public get isECTable(): boolean { return this._currentRow.metadata.isECTable; }

/**
* Name of the SQLite table for the current change row.
* Valid only after a successful call to [[step]].
* @throws [[IModelError]] if called before a successful [[step]] call.
* @beta
*/
public get tableName(): string {
if (this._tableName === undefined)
throw new IModelError(IModelStatus.BadRequest, "ChangesetReader.tableName is only valid after step() has positioned the reader on a current valid change.");
return this._tableName;
}
public get tableName(): string { return this._currentRow.metadata.tableName; }

/**
* `true` when the current change was applied indirectly
* Valid only after a successful call to [[step]].
* @throws [[IModelError]] if called before a successful [[step]] call.
* @beta
*/
public get isIndirectChange(): boolean {
if (this._isIndirectChange === undefined)
throw new IModelError(IModelStatus.BadRequest, "ChangesetReader.isIndirectChange is only valid after step() has positioned the reader on a current valid change.");
return this._isIndirectChange;
}
public get isIndirectChange(): boolean { return this._currentRow.metadata.isIndirectChange; }

/**
* Post-change (inserted or updated-new) EC instance, populated after each [[step]] call.
* Post-change (inserted or updated-new) EC instance, computed lazily after each [[step]] call.
* `undefined` when the current row is a Delete or a non-EC table row or [[step]] returned false.
* @beta
*/
public inserted?: ChangeInstance;
public get inserted(): ChangeInstance | undefined {
Comment thread
rschili marked this conversation as resolved.
const row = this._cacheIndex < this._cache.length ? this._cache[this._cacheIndex] : undefined;
if (!row || !row.metadata.isECTable || !row.newValues)
return undefined;
const op = this.op;
return {
Comment thread
rschili marked this conversation as resolved.
Outdated
...row.newValues.data,
$meta: {
op,
tables: [row.metadata.tableName],
changeIndexes: [this._changeIndex],
stage: "New",
instanceKey: row.newValues.key,
propFilter: this._propFilter,
changeFetchedPropNames: row.newValues.changeFetchedPropNames,
rowOptions: this._rowOptions,
isIndirectChange: row.metadata.isIndirectChange,
},
};
}

/**
* Pre-change (deleted or updated-old) EC instance, populated after each [[step]] call.
* Pre-change (deleted or updated-old) EC instance, computed lazily after each [[step]] call.
* `undefined` when the current row is an Insert or a non-EC table row or [[step]] returned false.
* @beta
*/
public deleted?: ChangeInstance;
public get deleted(): ChangeInstance | undefined {
const row = this._cacheIndex < this._cache.length ? this._cache[this._cacheIndex] : undefined;
if (!row || !row.metadata.isECTable || !row.oldValues)
return undefined;
const op = this.op;
return {
...row.oldValues.data,
$meta: {
op,
tables: [row.metadata.tableName],
changeIndexes: [this._changeIndex],
stage: "Old",
instanceKey: row.oldValues.key,
propFilter: this._propFilter,
changeFetchedPropNames: row.oldValues.changeFetchedPropNames,
rowOptions: this._rowOptions,
isIndirectChange: row.metadata.isIndirectChange,
},
};
}

// Private — callers use static factory methods.
private constructor(db: AnyDb) {
Expand Down Expand Up @@ -262,6 +309,13 @@ export class ChangesetReader implements Disposable, ChangeSource {
return reader;
}

/** Throws if [[step]] has already been called, preventing filter/mode changes mid-iteration.
* @internal */
private throwIfAlreadyStepped(): void {
if (this._changeIndex > 0)
throw new IModelError(IModelStatus.BadRequest, "ChangesetReader: filters and strict mode must be configured before the first call to step().");
}
Comment thread
soham-bentley marked this conversation as resolved.

/** Handle errors that occur while auto closing the reader if there is also an error while opening the reader */
private handleCloseErrorWhileOpening(e: unknown): void {
try {
Expand All @@ -283,21 +337,23 @@ export class ChangesetReader implements Disposable, ChangeSource {
* That means the rows for changes from other tables will be skipped entirely and won't be visible through the reader.
* @param tableNames SQLite table names to include.
* Note: Table names must be provided in the correct case for proper filtering.
* @throws if the native layer encounters an error while setting the filter.
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error while setting the filter.
* @beta
*/
public setTableNameFilters(tableNames: Set<string>): void {
this.throwIfAlreadyStepped();
this._nativeReader.setTableNameFilters(Array.from(tableNames));
}

/**
* Restrict iteration to changes with the given operation types.
* That means the rows for changes with other operation types will be skipped entirely and won't be visible through the reader.
* @param ops Operations to include.
* @throws if the native layer encounters an error while setting the filter.
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error while setting the filter.
* @beta
*/
public setOpCodeFilters(ops: Set<SqliteChangeOp>): void {
this.throwIfAlreadyStepped();
this._nativeReader.setOpCodeFilters(Array.from(ops));
}

Expand All @@ -306,37 +362,42 @@ export class ChangesetReader implements Disposable, ChangeSource {
* That means the rows for changes from other EC classes will be skipped entirely and won't be visible through the reader.
* @param classNames EC class names to include. The classNames should be in the full name format(i.e. "SchemaName:ClassName").
* Note: Schema names and class names must be provided in the correct case for proper filtering. Derived classes are not automatically included, so they must be specified explicitly if needed.
* @throws if the native layer encounters an error while setting the filter.
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error while setting the filter.
* @beta
*/
public setClassNameFilters(classNames: Set<string>): void {
this.throwIfAlreadyStepped();
this._nativeReader.setClassNameFilters(Array.from(classNames));
}

/**
* Remove the table-name filters
* @throws if the native layer encounters an error.
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error.
* @beta
*/
public clearTableNameFilters(): void {
this.throwIfAlreadyStepped();
this._nativeReader.clearTableNameFilters();
}

/**
* Remove the op-code filters
* @throws if the native layer encounters an error.
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error.
* @beta
*/
public clearOpCodeFilters(): void {
this.throwIfAlreadyStepped();
this._nativeReader.clearOpCodeFilters();
}

/**
* Remove the class-name filters
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error.
* @throws if the native layer encounters an error.
* @beta
*/
Comment thread
soham-bentley marked this conversation as resolved.
public clearClassNameFilters(): void {
this.throwIfAlreadyStepped();
this._nativeReader.clearClassNameFilters();
}

Expand All @@ -359,10 +420,11 @@ export class ChangesetReader implements Disposable, ChangeSource {
* exactly the schema that was in effect when the changeset was written.
*
* @see [[disableStrictMode]] — the default (lenient) behaviour.
* @throws if the native layer encounters an error.
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error.
* @beta
*/
public enableStrictMode(): void {
this.throwIfAlreadyStepped();
this._nativeReader.enableStrictMode();
}

Expand All @@ -377,10 +439,11 @@ export class ChangesetReader implements Disposable, ChangeSource {
* missing columns are silently ignored.
*
* @see [[enableStrictMode]] — throw on column-count mismatches instead.
* @throws if the native layer encounters an error.
* @throws if [[step]] has already been called and the reader successfully stepped at least once(i.e. returned true for a step() call) or if the native layer encounters an error.
* @beta
*/
public disableStrictMode(): void {
this.throwIfAlreadyStepped();
this._nativeReader.disableStrictMode();
}

Expand All @@ -396,70 +459,23 @@ export class ChangesetReader implements Disposable, ChangeSource {
* @beta
*/
public step(): boolean {
Comment thread
rschili marked this conversation as resolved.
this.inserted = undefined;
this.deleted = undefined;
this._op = undefined;
this._isECTable = undefined;
this._tableName = undefined;
this._isIndirectChange = undefined;

if (this._nativeReader.step()) {
this._changeIndex++;
const meta = this._nativeReader.getChangeMetadata();
const nativeOp = meta.opCode;
const op: SqliteChangeOp = nativeOp === DbOpcode.Insert ? "Inserted" : nativeOp === DbOpcode.Update ? "Updated" : "Deleted";
this._op = op;

this._tableName = meta.tableName;
this._isIndirectChange = meta.isIndirectChange;
this._isECTable = meta.isECTable;

if (this._cacheIndex + 1 < this._cache.length) {
// Still have rows in cache — advance the pointer
this._cacheIndex++;
} else {
// Cache empty or fully consumed — fetch next batch from native
const nativeRowOpts = this._rowOptions ? this.toNativeRowOptions(this._rowOptions) : {};

if (op === "Inserted" || op === "Updated") {
const rowValue = this._nativeReader.getValue(DbChangeStage.New, nativeRowOpts);
if (rowValue !== undefined) {
this.inserted = {
...rowValue.data,
$meta: {
op,
tables: [this._tableName],
changeIndexes: [this._changeIndex],
stage: "New",
instanceKey: rowValue.key,
propFilter: this._propFilter,
changeFetchedPropNames: rowValue.changeFetchedPropNames,
rowOptions: this._rowOptions,
isIndirectChange: this._isIndirectChange,
},
};
}
const rows = this._nativeReader.step(this._batchSize, nativeRowOpts);
if (rows.length === 0) {
this._cache = [];
this._cacheIndex = 0;
return false;
}

if (op === "Deleted" || op === "Updated") {
const rowValue = this._nativeReader.getValue(DbChangeStage.Old, nativeRowOpts);
if (rowValue !== undefined) {
this.deleted = {
...rowValue.data,
$meta: {
op,
tables: [this._tableName],
changeIndexes: [this._changeIndex],
stage: "Old",
instanceKey: rowValue.key,
propFilter: this._propFilter,
changeFetchedPropNames: rowValue.changeFetchedPropNames,
rowOptions: this._rowOptions,
isIndirectChange: this._isIndirectChange,
},
};
}
}

return true;
this._cache = rows;
this._cacheIndex = 0;
}

return false;
this._changeIndex++;
return true;
}

/**
Expand All @@ -469,9 +485,10 @@ export class ChangesetReader implements Disposable, ChangeSource {
* @beta
*/
public get op(): SqliteChangeOp {
if (this._op === undefined)
throw new IModelError(IModelStatus.BadRequest, "ChangesetReader.op is only valid after step() has positioned the reader on a current valid change.");
return this._op;
const opCode = this._currentRow.metadata.opCode;
return opCode === DbOpcode.Insert ? "Inserted"
: opCode === DbOpcode.Update ? "Updated"
: "Deleted";
}

// ---------------------------------------------------------------------------
Expand All @@ -488,12 +505,8 @@ export class ChangesetReader implements Disposable, ChangeSource {
*/
public close(): void {
this._changeIndex = 0;
this._op = undefined;
this._isECTable = undefined;
this._tableName = undefined;
this._isIndirectChange = undefined;
this.inserted = undefined;
this.deleted = undefined;
this._cache = [];
this._cacheIndex = 0;
this._nativeReader.close();
}

Expand Down
Loading
Loading