@@ -30,7 +30,16 @@ import 'worker.dart';
3030final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
3131 @override
3232 final NativeSqliteOpenFactory openFactory;
33- late final Future <SqliteConnectionPool > _pool = _openNativePool (openFactory);
33+ late final Future <SqliteConnectionPool > _pool =
34+ _openNativePool (openFactory).then ((pool) {
35+ // Pipe updates into a stream controller as soon as the pool is opened to
36+ // avoid missing updates if the stream is only listened to later.
37+ _updates.addStream (pool.updatedTables.map ((e) {
38+ return UpdateNotification (e.toSet ());
39+ }));
40+
41+ return pool;
42+ });
3443 bool _isClosed = false ;
3544 final _lockGuard = Object ();
3645
@@ -43,10 +52,11 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
4352
4453 final Queue <IsolateWorker > _workers;
4554
55+ final StreamController <UpdateNotification > _updates =
56+ StreamController .broadcast (sync : true );
57+
4658 @override
47- late final Stream <UpdateNotification > updates = Stream .fromFuture (_pool)
48- .asyncExpand ((pool) => pool.updatedTables
49- .map ((changedTables) => UpdateNotification (changedTables.toSet ())));
59+ Stream <UpdateNotification > get updates => _updates.stream;
5060
5161 NativeSqliteDatabaseImpl (this .openFactory)
5262 :
@@ -84,58 +94,6 @@ final class NativeSqliteDatabaseImpl extends SqliteDatabaseImpl {
8494 }
8595 }
8696
87- /// Open a read-only transaction.
88- ///
89- /// Up to [maxReaders] read transactions can run concurrently.
90- /// After that, read transactions are queued.
91- ///
92- /// Read transactions can run concurrently to a write transaction.
93- ///
94- /// Changes from any write transaction are not visible to read transactions
95- /// started before it.
96- @override
97- Future <T > readTransaction <T >(
98- Future <T > Function (SqliteReadContext tx) callback,
99- {Duration ? lockTimeout}) async {
100- return _useConnection (
101- writer: false ,
102- abortTrigger: lockTimeout? .asTimeout,
103- debugContext: 'readTransaction' ,
104- (context) {
105- return _transactionInLease (context, callback);
106- },
107- );
108- }
109-
110- /// Open a read-write transaction.
111- ///
112- /// Only a single write transaction can run at a time - any concurrent
113- /// transactions are queued.
114- ///
115- /// The write transaction is automatically committed when the callback finishes,
116- /// or rolled back on any error.
117- @override
118- Future <T > writeTransaction <T >(
119- Future <T > Function (SqliteWriteContext tx) callback,
120- {Duration ? lockTimeout}) {
121- return _useConnection (
122- writer: true ,
123- abortTrigger: lockTimeout? .asTimeout,
124- debugContext: 'writeTransaction' ,
125- (context) {
126- return _transactionInLease (context, callback);
127- },
128- );
129- }
130-
131- Future <T > _transactionInLease <T >(
132- _LeasedContext context,
133- Future <T > Function (SqliteWriteContext tx) callback,
134- ) {
135- final ctx = ScopedWriteContext (context);
136- return ctx.writeTransaction (callback).whenComplete (ctx.invalidate);
137- }
138-
13997 @override
14098 Future <T > abortableReadLock <T >(
14199 Future <T > Function (SqliteReadContext tx) callback,
0 commit comments