@@ -34,6 +34,8 @@ import {
3434} from './MongoRelation.js' ;
3535import { MongoSnapshotter } from './MongoSnapshotter.js' ;
3636import { CHECKPOINTS_COLLECTION , timestampToDate } from './replication-utils.js' ;
37+ import { staticFilterToMongoExpression } from './staticFilters.js' ;
38+ import { inspect } from 'node:util' ;
3739
3840export interface ChangeStreamOptions {
3941 connections : MongoManager ;
@@ -212,6 +214,7 @@ export class ChangeStream {
212214 private getSourceNamespaceFilters ( writer : BucketDataWriter ) : {
213215 $match : any ;
214216 multipleDatabases : boolean ;
217+ filters : any [ ] ;
215218 } {
216219 const sourceTables = writer . rowProcessor . getSourceTables ( ) ;
217220
@@ -230,20 +233,31 @@ export class ChangeStream {
230233 multipleDatabases = true ;
231234 }
232235
236+ let filterExpression =
237+ tablePattern . filter == null
238+ ? { $literal : true }
239+ : staticFilterToMongoExpression ( tablePattern . filter , { columnPrefix : '$fullDocument.' } ) ;
240+
233241 if ( tablePattern . isWildcard ) {
234242 $refilters . push ( {
235243 'ns.db' : tablePattern . schema ,
236244 'ns.coll' : new RegExp ( '^' + escapeRegExp ( tablePattern . tablePrefix ) )
237245 } ) ;
238246 filters . push ( {
239247 'ns.db' : tablePattern . schema ,
240- 'ns.coll' : new RegExp ( '^' + escapeRegExp ( tablePattern . tablePrefix ) )
248+ 'ns.coll' : new RegExp ( '^' + escapeRegExp ( tablePattern . tablePrefix ) ) ,
249+ $expr : filterExpression
241250 } ) ;
242251 } else {
243252 $inFilters . push ( {
244253 db : tablePattern . schema ,
245254 coll : tablePattern . name
246255 } ) ;
256+ filters . push ( {
257+ 'ns.db' : tablePattern . schema ,
258+ 'ns.coll' : tablePattern . name ,
259+ $expr : filterExpression
260+ } ) ;
247261 }
248262 }
249263
@@ -265,9 +279,9 @@ export class ChangeStream {
265279 : // collection-level: filter on coll only
266280 { 'ns.coll' : { $in : $inFilters . map ( ( ns ) => ns . coll ) } } ;
267281 if ( $refilters . length > 0 ) {
268- return { $match : { $or : [ nsFilter , ...$refilters ] } , multipleDatabases } ;
282+ return { $match : { $or : [ nsFilter , ...$refilters ] } , multipleDatabases, filters } ;
269283 }
270- return { $match : nsFilter , multipleDatabases } ;
284+ return { $match : nsFilter , multipleDatabases, filters } ;
271285 }
272286
273287 private async checkPostImages ( db : string , collectionInfo : mongo . CollectionInfo ) {
@@ -420,6 +434,12 @@ export class ChangeStream {
420434 {
421435 $match : filters . $match
422436 } ,
437+ // Not working currently - getting "resumeToken not found"
438+ // {
439+ // $match: {
440+ // $or: filters.filters
441+ // }
442+ // },
423443 { $changeStreamSplitLargeEvent : { } }
424444 ] ;
425445
@@ -485,37 +505,56 @@ export class ChangeStream {
485505 // Ignore the postImages check in this case.
486506 }
487507
488- const result = await writer . resolveTables ( {
489- connection_id : this . connection_id ,
490- connection_tag : this . connections . connectionTag ,
491- entity_descriptor : descriptor
508+ // What happens here:
509+ // 1. We see a new collection that we haven't observed before.
510+ // 2. We check which table pattern(s) match this collection, _regardless of specific row filters_.
511+ // 3. We resolve the tables for those patterns.
512+
513+ // FIXME: don't scan through it all
514+ // FIXME: handle wildcards
515+ const patterns = writer . rowProcessor . getSourceTables ( ) . filter ( ( t ) => {
516+ return (
517+ t . connectionTag == this . connections . connectionTag && t . name == descriptor . name && t . schema == descriptor . schema
518+ ) ;
492519 } ) ;
493520
494- const snapshot = options . snapshot ;
495- this . relationCache . set ( getCacheIdentifier ( descriptor ) , result . tables ) ;
521+ let allTables : SourceTable [ ] = [ ] ;
522+ for ( let pattern of patterns ) {
523+ const result = await writer . resolveTables ( {
524+ connection_id : this . connection_id ,
525+ connection_tag : this . connections . connectionTag ,
526+ entity_descriptor : descriptor ,
527+ pattern : pattern
528+ } ) ;
496529
497- // Drop conflicting collections.
498- // This is generally not expected for MongoDB source dbs, so we log an error.
499- if ( result . dropTables . length > 0 ) {
500- this . logger . error (
501- `Conflicting collections found for ${ JSON . stringify ( descriptor ) } . Dropping: ${ result . dropTables . map ( ( t ) => t . id ) . join ( ', ' ) } `
502- ) ;
503- await writer . drop ( result . dropTables ) ;
504- }
530+ const snapshot = options . snapshot ;
531+ this . relationCache . set ( getCacheIdentifier ( descriptor ) , result . tables ) ;
505532
506- // Snapshot if:
507- // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
508- // 2. Snapshot is not already done, AND:
509- // 3. The table is used in sync rules.
510- for ( let table of result . tables ) {
511- const shouldSnapshot = snapshot && ! table . snapshotComplete && table . syncAny ;
512- if ( shouldSnapshot ) {
513- this . logger . info ( `New collection: ${ descriptor . schema } .${ descriptor . name } ` ) ;
514- await this . snapshotter . queueSnapshot ( writer , table ) ;
533+ // Drop conflicting collections.
534+ // This is generally not expected for MongoDB source dbs, so we log an error.
535+ if ( result . dropTables . length > 0 ) {
536+ this . logger . error (
537+ `Conflicting collections found for ${ JSON . stringify ( descriptor ) } . Dropping: ${ result . dropTables . map ( ( t ) => t . id ) . join ( ', ' ) } `
538+ ) ;
539+ await writer . drop ( result . dropTables ) ;
515540 }
541+
542+ // Snapshot if:
543+ // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
544+ // 2. Snapshot is not already done, AND:
545+ // 3. The table is used in sync rules.
546+ for ( let table of result . tables ) {
547+ const shouldSnapshot = snapshot && ! table . snapshotComplete && table . syncAny ;
548+ if ( shouldSnapshot ) {
549+ this . logger . info ( `New collection: ${ descriptor . schema } .${ descriptor . name } ` ) ;
550+ await this . snapshotter . queueSnapshot ( writer , table ) ;
551+ }
552+ }
553+
554+ allTables . push ( ...result . tables ) ;
516555 }
517556
518- return result . tables ;
557+ return allTables ;
519558 }
520559
521560 private async drop ( writer : storage . BucketDataWriter , entity : SourceEntityDescriptor ) : Promise < void > {
0 commit comments