Skip to content

Commit 8033135

Browse files
dwcullopDarrin Cullop
andauthored
Wait for quiescence in MergeManyChangeSets stress tests (reactivemarbles#1100)
* Make MergeManyChangeSetsCacheSourceCompare stress test deterministic MultiThreadedStressTest(10, 50) fails intermittently in CI with two prices present in market.PricesCache.Items but missing from the live aggregator. The two affected prices have the latest timestamps in the batch, which is the signature of a race during high-contention production. Bogus.Randomizer wraps System.Random. When constructed with a seed, the randomizer stores the random in a protected localSeed field and bypasses its internal Locker on every generator call. The test shares one seeded Randomizer across many parallel producer threads: - Directly via _randomizer.Number / .Bool / .TimeSpan / .Interval - Indirectly via _marketFaker.WithSeed(_randomizer), since every Faker<T>.Generate call routes through the same randomizer Concurrent calls into the underlying System.Random corrupt its internal state, producing values inconsistent with what a serialized run would produce. That is sufficient to explain the observed asymmetry between the post-hoc PricesCache snapshot and the live aggregator stream. Introduce SynchronizedRandomizer, a Randomizer subclass that replaces the protected localSeed field with a LockedRandom (a Random subclass that serializes every virtual method on an internal lock). The seed and method contracts are unchanged; the wrapper only adds synchronization. Apply it to the failing fixture. Other Randomizer uses across the test project remain unchanged for now; they are either single-threaded or have not exhibited flake symptoms. Verified: 20 consecutive runs of the fixture pass at MaxParallelThreads=16, zero failures. * Wait for quiescence in MergeManyChangeSets stress tests The post-reactivemarbles#1079 cache delivery model decouples mutation from notification: AddOrUpdate enqueues a notification and returns; the actual delivery to subscribers runs later on whichever thread wins the drain. That removed the cross-cache deadlock the old Synchronize(lock) shape produced, but it opened a small window between mutation and observed delivery. Tests that compare a live aggregator's view against the cache's current Items at assert time can see disagreement during that window. The source-compare fixture already adopted the right shape: var merged = source.MergeManyChangeSets(...).Publish(); var cacheCompleted = merged.LastOrDefaultAsync().ToTask(); using var local = merged.AsAggregator(); using var connect = merged.Connect(); ... await cacheCompleted; CheckResultContents(..., local); Port the same pattern to the cache and list MergeManyChangeSets stress fixtures. The local aggregator now sits on the Publish chain so it shares the completion task; the await before CheckResultContents pins the quiescence point. Also delete the SynchronizedRandomizer change made earlier on this branch. Bogus.Randomizer takes a process-wide lock on Locker.Value for every generator call regardless of whether localSeed is set, so the wrapper was addressing a non-problem. --------- Co-authored-by: Darrin Cullop <dacullop@microsoft.com>
1 parent 46ccd1a commit 8033135

2 files changed

Lines changed: 19 additions & 7 deletions

File tree

src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Reactive.Concurrency;
66
using System.Reactive.Disposables;
77
using System.Reactive.Linq;
8+
using System.Reactive.Threading.Tasks;
89
using System.Threading.Tasks;
910
using Bogus;
1011
using DynamicData.Kernel;
@@ -90,10 +91,11 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
9091
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
9192
.Finally(market.PricesCache.Dispose);
9293

93-
var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices);
94-
using var priceResults = merged.AsAggregator();
95-
94+
var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices).Publish();
9695
var adding = true;
96+
var cacheCompleted = merged.LastOrDefaultAsync().ToTask();
97+
using var priceResults = merged.AsAggregator();
98+
using var connect = merged.Connect();
9799

98100
// Start asynchrononously modifying the parent list and the child lists
99101
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
@@ -119,6 +121,9 @@ IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int para
119121
}
120122
while (adding);
121123

124+
// Wait for the source cache to finish delivering all notifications.
125+
await cacheCompleted;
126+
122127
// Verify the results
123128
CheckResultContents(_marketCacheResults, priceResults);
124129
}

src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Reactive.Concurrency;
66
using System.Reactive.Disposables;
77
using System.Reactive.Linq;
8+
using System.Reactive.Threading.Tasks;
89
using System.Threading.Tasks;
910
using Bogus;
1011
using DynamicData.Kernel;
@@ -86,9 +87,11 @@ IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, int animalCount, int par
8687
.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler))
8788
.Finally(owner.Animals.Dispose);
8889

89-
var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect());
90-
90+
var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()).Publish();
9191
var addingAnimals = true;
92+
var cacheCompleted = mergeAnimals.LastOrDefaultAsync().ToTask();
93+
using var animalResults = mergeAnimals.AsAggregator();
94+
using var connect = mergeAnimals.Connect();
9295

9396
// Start asynchrononously modifying the parent list and the child lists
9497
using var addAnimals = AddRemoveAnimalsStress(ownerCount, animalCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
@@ -114,8 +117,12 @@ IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, int animalCount, int par
114117
}
115118
while (addingAnimals);
116119

117-
// Verify the results
118-
CheckResultContents();
120+
// Wait for the source cache to finish delivering all notifications.
121+
await cacheCompleted;
122+
123+
// Verify the results against the aggregator wired into the same Publish chain
124+
// that cacheCompleted observes.
125+
CheckResultContents(_animalOwners.Items, _animalOwnerResults, animalResults);
119126
}
120127

121128
[Fact]

0 commit comments

Comments
 (0)