Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 58 additions & 18 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,28 +162,44 @@ static async ValueTask CastAndDispose(IDisposable resource)

protected virtual async Task OnEventAsync(WatchEventType eventType, TEntity entity, CancellationToken cancellationToken)
{
var deletionTrackingEntry = entity.Metadata.DeletionTimestamp is not null
? new DeletionTrackingEntry(GetDeletionCacheKey(entity), GetDeletionFingerprint(entity))
: null;

if (eventType != WatchEventType.Deleted)
{
switch (settings.ReconcileStrategy)
{
case ReconcileStrategy.ByGeneration:
// bypass generation check for finalizer handling — finalizer removal does not increment generation
if (entity.Metadata.DeletionTimestamp is null)
case ReconcileStrategy.ByGeneration when deletionTrackingEntry is not null:
var cachedDeletionFingerprint = await EntityCache.TryGetAsync<string>(
deletionTrackingEntry.CacheKey,
token: cancellationToken);

if (cachedDeletionFingerprint.HasValue && cachedDeletionFingerprint.Value == deletionTrackingEntry.Fingerprint)
{
var cachedGeneration = await EntityCache.TryGetAsync<long>(
entity.Uid(),
token: cancellationToken);

// skip reconcile if generation did not increase.
if (cachedGeneration.HasValue && cachedGeneration.Value >= entity.Generation())
{
logger
.LogDebug(
"""Entity "{Identifier}" modification did not modify generation. Skip event.""",
entity.ToIdentifierString());

return;
}
logger
.LogDebug(
"""Entity "{Identifier}" deletion state did not change. Skip event.""",
entity.ToIdentifierString());

return;
}

break;
case ReconcileStrategy.ByGeneration when deletionTrackingEntry is null:
var cachedGeneration = await EntityCache.TryGetAsync<long>(
entity.Uid(),
token: cancellationToken);

// skip reconcile if generation did not increase.
if (cachedGeneration.HasValue && cachedGeneration.Value >= entity.Generation())
{
logger
.LogDebug(
"""Entity "{Identifier}" modification did not modify generation. Skip event.""",
entity.ToIdentifierString());

return;
}

break;
Expand Down Expand Up @@ -221,27 +237,49 @@ await entityQueue
if (eventType == WatchEventType.Deleted)
{
await EntityCache.RemoveAsync(entity.Uid(), token: cancellationToken);
await EntityCache.RemoveAsync(GetDeletionCacheKey(entity), token: cancellationToken);
}
else
{
switch (settings.ReconcileStrategy)
{
case ReconcileStrategy.ByGeneration when entity.Metadata.DeletionTimestamp is null:
case ReconcileStrategy.ByGeneration when deletionTrackingEntry is not null:
await EntityCache.SetAsync(
deletionTrackingEntry.CacheKey,
deletionTrackingEntry.Fingerprint,
token: cancellationToken);

break;
case ReconcileStrategy.ByGeneration when deletionTrackingEntry is null:
await EntityCache.SetAsync(
entity.Uid(),
entity.Generation() ?? 1,
token: cancellationToken);

break;
case ReconcileStrategy.ByResourceVersion:
await EntityCache.SetAsync(
entity.Uid(),
entity.ResourceVersion(),
token: cancellationToken);

break;
}
}
}

private static string GetDeletionCacheKey(TEntity entity)
=> $"{entity.Uid()}:deletion";

private static string GetDeletionFingerprint(TEntity entity)
=> string.Join(
':',
"deleting",
entity.Metadata.DeletionTimestamp?.ToUniversalTime().ToString("O"),
entity.Metadata.DeletionGracePeriodSeconds,
entity.Generation(),
string.Join(',', entity.Finalizers() ?? []));

private async Task WatchClientEventsAsync(CancellationToken stoppingToken)
{
string? currentVersion = null;
Expand Down Expand Up @@ -349,4 +387,6 @@ e.InnerException is EndOfStreamException &&
delay.TotalSeconds);
await Task.Delay(delay);
}

private sealed record DeletionTrackingEntry(string CacheKey, string Fingerprint);
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,23 +303,32 @@ public void Constructor_Should_Request_ResourceWatcherByResourceVersion_Cache_Fo
}

[Fact]
public async Task OnEvent_Should_Enqueue_When_Entity_Has_DeletionTimestamp_And_Strategy_Is_ByGeneration()
public async Task OnEvent_Should_Enqueue_When_Finalizers_Changed_During_Deletion_And_Strategy_Is_ByGeneration()
{
// Arrange
var entity = CreateTestEntity();
entity.Metadata.DeletionTimestamp = DateTime.UtcNow;
entity.Metadata.DeletionTimestamp = new DateTime(2026, 05, 28, 10, 00, 00, DateTimeKind.Utc);
entity.Metadata.DeletionGracePeriodSeconds = 30;
entity.Metadata.Finalizers = ["operator.test/second-finalizer"];

var mockCache = new Mock<IFusionCache>();
var mockQueue = new Mock<ITimedEntityQueue<V1OperatorIntegrationTestEntity>>();
var watcher = CreateTestableWatcher(cache: mockCache.Object, queue: mockQueue.Object);

mockCache
.Setup(c => c.TryGetAsync<string>(
It.Is<string>(s => s == $"{entity.Uid()}:deletion"),
It.IsAny<FusionCacheEntryOptions>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(MaybeValue<string>.FromValue("deleting:2026-05-28T10:00:00.0000000Z:30:1:operator.test/first-finalizer"));

// Act
await watcher.InvokeOnEventAsync(
WatchEventType.Modified,
entity,
TestContext.Current.CancellationToken);

// Assert – enqueued without any cache read (generation check bypassed)
// Assert
mockQueue.Verify(
q => q.Enqueue(
entity,
Expand All @@ -330,9 +339,57 @@ await watcher.InvokeOnEventAsync(
It.IsAny<CancellationToken>()),
Times.Once);
mockCache.Verify(
c => c.TryGetAsync<long>(
c => c.SetAsync(
It.Is<string>(key => key == $"{entity.Uid()}:deletion"),
It.Is<string>(token => token == "deleting:2026-05-28T10:00:00.0000000Z:30:1:operator.test/second-finalizer"),
It.IsAny<FusionCacheEntryOptions>(),
It.IsAny<IEnumerable<string>?>(),
It.IsAny<CancellationToken>()),
Times.Once);
}

[Fact]
public async Task OnEvent_Should_Skip_Enqueue_When_Deletion_Token_Is_Unchanged_And_Strategy_Is_ByGeneration()
{
// Arrange
var entity = CreateTestEntity();
entity.Metadata.DeletionTimestamp = new DateTime(2026, 05, 28, 10, 00, 00, DateTimeKind.Utc);
entity.Metadata.DeletionGracePeriodSeconds = 30;
entity.Metadata.Finalizers = ["operator.test/finalizer"];

var mockCache = new Mock<IFusionCache>();
var mockQueue = new Mock<ITimedEntityQueue<V1OperatorIntegrationTestEntity>>();
var watcher = CreateTestableWatcher(cache: mockCache.Object, queue: mockQueue.Object);

mockCache
.Setup(c => c.TryGetAsync<string>(
It.Is<string>(s => s == $"{entity.Uid()}:deletion"),
It.IsAny<FusionCacheEntryOptions>(),
It.IsAny<CancellationToken>()))
.ReturnsAsync(MaybeValue<string>.FromValue("deleting:2026-05-28T10:00:00.0000000Z:30:1:operator.test/finalizer"));

// Act
await watcher.InvokeOnEventAsync(
WatchEventType.Modified,
entity,
TestContext.Current.CancellationToken);

// Assert
mockQueue.Verify(
q => q.Enqueue(
It.IsAny<V1OperatorIntegrationTestEntity>(),
It.IsAny<ReconciliationType>(),
It.IsAny<ReconciliationTriggerSource>(),
It.IsAny<TimeSpan>(),
It.IsAny<int>(),
It.IsAny<CancellationToken>()),
Times.Never);
mockCache.Verify(
c => c.SetAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<FusionCacheEntryOptions>(),
It.IsAny<IEnumerable<string>?>(),
It.IsAny<CancellationToken>()),
Times.Never);
}
Expand Down
Loading