-
Notifications
You must be signed in to change notification settings - Fork 87
feat(controllers): Multi-controller/-finalizer dispatch via ShouldHandle(TEntity)
#1084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
e304e28
acdbe13
1a87f17
e93746d
93314b2
5fabcd5
49f82e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,7 +46,10 @@ public IOperatorBuilder AddController<TImplementation, TEntity>() | |
| where TImplementation : class, IEntityController<TEntity> | ||
| where TEntity : IKubernetesObject<V1ObjectMeta> | ||
| { | ||
| Services.TryAddScoped<IEntityController<TEntity>, TImplementation>(); | ||
| // TryAddEnumerable dedupes by (ServiceType, ImplementationType), so calling AddController | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure this AI comment is really needed 😄 |
||
| // with the same TImplementation twice registers it only once — while still allowing | ||
| // distinct implementations to coexist for the same TEntity. | ||
| Services.TryAddEnumerable(ServiceDescriptor.Scoped<IEntityController<TEntity>, TImplementation>()); | ||
| Services.TryAddSingleton<IReconciler<TEntity>, Reconciler<TEntity>>(); | ||
|
stevefan1999-personal marked this conversation as resolved.
|
||
|
|
||
| // Requeue | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||
| using k8s.Models; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| using KubeOps.Abstractions.Builder; | ||||||||||||||||||||||||||||||||||||||||||||||||
| using KubeOps.Abstractions.Entities; | ||||||||||||||||||||||||||||||||||||||||||||||||
| using KubeOps.Abstractions.Reconciliation; | ||||||||||||||||||||||||||||||||||||||||||||||||
| using KubeOps.Abstractions.Reconciliation.Controller; | ||||||||||||||||||||||||||||||||||||||||||||||||
| using KubeOps.Abstractions.Reconciliation.Finalizer; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -115,8 +116,11 @@ await entityQueue | |||||||||||||||||||||||||||||||||||||||||||||||
| cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| await using var scope = serviceProvider.CreateAsyncScope(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| var result = await controller.DeletedAsync(reconciliationContext.Entity, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||
| var result = await DispatchToMatchingControllers( | ||||||||||||||||||||||||||||||||||||||||||||||||
| scope.ServiceProvider, | ||||||||||||||||||||||||||||||||||||||||||||||||
| reconciliationContext.Entity, | ||||||||||||||||||||||||||||||||||||||||||||||||
| (ctrl, entity, ct) => ctrl.DeletedAsync(entity, ct), | ||||||||||||||||||||||||||||||||||||||||||||||||
| cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| if (result.IsSuccess) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -139,19 +143,80 @@ await entityQueue | |||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| var finalizers = scope.ServiceProvider.GetKeyedServices<IEntityFinalizer<TEntity>>(KeyedService.AnyKey); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| var anyFinalizerAdded = finalizers | ||||||||||||||||||||||||||||||||||||||||||||||||
| .Aggregate( | ||||||||||||||||||||||||||||||||||||||||||||||||
| false, | ||||||||||||||||||||||||||||||||||||||||||||||||
| (changed, finalizer) => entity.AddFinalizer(finalizer.GetIdentifierName(entity)) || changed); | ||||||||||||||||||||||||||||||||||||||||||||||||
| var anyFinalizerAdded = false; | ||||||||||||||||||||||||||||||||||||||||||||||||
| foreach (var finalizer in finalizers) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| cancellationToken.ThrowIfCancellationRequested(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (!await finalizer.ShouldHandle(entity)) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| anyFinalizerAdded = entity.AddFinalizer(finalizer.GetIdentifierName(entity)) || anyFinalizerAdded; | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
stevefan1999-personal marked this conversation as resolved.
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| if (anyFinalizerAdded) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| entity = await client.UpdateAsync(entity, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| var controller = scope.ServiceProvider.GetRequiredService<IEntityController<TEntity>>(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| return await controller.ReconcileAsync(entity, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||
| return await DispatchToMatchingControllers( | ||||||||||||||||||||||||||||||||||||||||||||||||
| scope.ServiceProvider, | ||||||||||||||||||||||||||||||||||||||||||||||||
| entity, | ||||||||||||||||||||||||||||||||||||||||||||||||
| (ctrl, e, ct) => ctrl.ReconcileAsync(e, ct), | ||||||||||||||||||||||||||||||||||||||||||||||||
| cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||
| /// Gets all <see cref="IEntityController{TEntity}"/> registrations whose <see cref="IEntityController{TEntity}.ShouldHandle"/> | ||||||||||||||||||||||||||||||||||||||||||||||||
| /// returns <c>true</c> for the given entity, then calls <paramref name="operation"/> on each in registration order. | ||||||||||||||||||||||||||||||||||||||||||||||||
| /// On the first failure the chain is short-circuited and that failure result is returned. | ||||||||||||||||||||||||||||||||||||||||||||||||
| /// If no controller is registered at all the result is a configuration-error failure; if controllers are | ||||||||||||||||||||||||||||||||||||||||||||||||
| /// registered but none claim responsibility, a success result is returned and a warning is logged. | ||||||||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||||||||
| private async Task<ReconciliationResult<TEntity>> DispatchToMatchingControllers( | ||||||||||||||||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to be honest I am pretty unsure about the possibility to have more than 1 controller being responsible for an entity - shouldn't the pattern be more like:
|
||||||||||||||||||||||||||||||||||||||||||||||||
| IServiceProvider services, | ||||||||||||||||||||||||||||||||||||||||||||||||
| TEntity entity, | ||||||||||||||||||||||||||||||||||||||||||||||||
| Func<IEntityController<TEntity>, TEntity, CancellationToken, Task<ReconciliationResult<TEntity>>> operation, | ||||||||||||||||||||||||||||||||||||||||||||||||
| CancellationToken cancellationToken) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| var registeredControllers = services.GetServices<IEntityController<TEntity>>().ToList(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (registeredControllers.Count == 0) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| return ReconciliationResult<TEntity>.Failure( | ||||||||||||||||||||||||||||||||||||||||||||||||
| entity, | ||||||||||||||||||||||||||||||||||||||||||||||||
| $"No IEntityController<{typeof(TEntity).Name}> registered. Did you forget to call AddController<T, TEntity>() on the operator builder?"); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+193
to
+199
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| var responsibleControllers = new List<IEntityController<TEntity>>(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| foreach (var controller in registeredControllers) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| cancellationToken.ThrowIfCancellationRequested(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (await controller.ShouldHandle(entity)) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| responsibleControllers.Add(controller); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| if (responsibleControllers.Count == 0) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| logger.LogWarning( | ||||||||||||||||||||||||||||||||||||||||||||||||
| """No responsible controller found for "{Kind}/{Name}". Skipping.""", | ||||||||||||||||||||||||||||||||||||||||||||||||
| entity.Kind, | ||||||||||||||||||||||||||||||||||||||||||||||||
| entity.Name()); | ||||||||||||||||||||||||||||||||||||||||||||||||
| return ReconciliationResult<TEntity>.Success(entity); | ||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
stevefan1999-personal marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| ReconciliationResult<TEntity> result = ReconciliationResult<TEntity>.Success(entity); | ||||||||||||||||||||||||||||||||||||||||||||||||
| foreach (var controller in responsibleControllers) | ||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||
| cancellationToken.ThrowIfCancellationRequested(); | ||||||||||||||||||||||||||||||||||||||||||||||||
| result = await operation(controller, result.Entity, cancellationToken); | ||||||||||||||||||||||||||||||||||||||||||||||||
| if (!result.IsSuccess) return result; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
| foreach (var controller in responsibleControllers) | |
| { | |
| cancellationToken.ThrowIfCancellationRequested(); | |
| result = await operation(controller, result.Entity, cancellationToken); | |
| if (!result.IsSuccess) return result; | |
| } | |
| return result; | |
| var requeueAfter = result.RequeueAfter; | |
| foreach (var controller in responsibleControllers) | |
| { | |
| cancellationToken.ThrowIfCancellationRequested(); | |
| result = await operation(controller, result.Entity, cancellationToken); | |
| if (!result.IsSuccess) return result; | |
| if (result.RequeueAfter is not null && | |
| (requeueAfter is null || result.RequeueAfter < requeueAfter)) | |
| { | |
| requeueAfter = result.RequeueAfter; | |
| } | |
| } | |
| return ReconciliationResult<TEntity>.Success(result.Entity, requeueAfter); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -127,6 +127,67 @@ public void Should_Add_Leader_Elector() | |
| s.Lifetime == ServiceLifetime.Singleton); | ||
| } | ||
|
|
||
| [Fact] | ||
| public void Should_Allow_Multiple_Controllers_For_Same_Entity_Type() | ||
| { | ||
| _builder.AddController<TestController, V1OperatorIntegrationTestEntity>(); | ||
| _builder.AddController<SecondTestController, V1OperatorIntegrationTestEntity>(); | ||
|
|
||
| var registrations = _builder.Services | ||
| .Where(s => | ||
| s.ServiceType == typeof(IEntityController<V1OperatorIntegrationTestEntity>) && | ||
| s.Lifetime == ServiceLifetime.Scoped) | ||
| .ToList(); | ||
|
|
||
| registrations.Should().HaveCount(2); | ||
| registrations.Should().Contain(s => s.ImplementationType == typeof(TestController)); | ||
| registrations.Should().Contain(s => s.ImplementationType == typeof(SecondTestController)); | ||
| } | ||
|
|
||
| [Fact] | ||
| public void Should_Dedupe_Identical_Controller_Registrations() | ||
| { | ||
| _builder.AddController<TestController, V1OperatorIntegrationTestEntity>(); | ||
| _builder.AddController<TestController, V1OperatorIntegrationTestEntity>(); | ||
|
|
||
| var registrations = _builder.Services | ||
| .Where(s => s.ServiceType == typeof(IEntityController<V1OperatorIntegrationTestEntity>)) | ||
| .ToList(); | ||
|
|
||
| registrations.Should().HaveCount(1); | ||
| registrations.Should().ContainSingle(s => s.ImplementationType == typeof(TestController)); | ||
| } | ||
|
|
||
| [Fact] | ||
| public void Should_Resolve_All_Controllers_For_Same_Entity_Type() | ||
| { | ||
| _builder.AddController<TestController, V1OperatorIntegrationTestEntity>(); | ||
| _builder.AddController<SecondTestController, V1OperatorIntegrationTestEntity>(); | ||
|
|
||
| var provider = _builder.Services.BuildServiceProvider(); | ||
| var controllers = provider | ||
| .GetServices<IEntityController<V1OperatorIntegrationTestEntity>>() | ||
| .ToList(); | ||
|
|
||
| controllers.Should().HaveCount(2); | ||
| controllers.Should().ContainItemsAssignableTo<IEntityController<V1OperatorIntegrationTestEntity>>(); | ||
| controllers.Select(c => c.GetType()).Should().Contain(typeof(TestController)); | ||
| controllers.Select(c => c.GetType()).Should().Contain(typeof(SecondTestController)); | ||
|
||
| } | ||
|
|
||
| [Fact] | ||
| public void Should_Not_Register_Duplicate_ResourceWatcher_For_Multiple_Controllers() | ||
| { | ||
| _builder.AddController<TestController, V1OperatorIntegrationTestEntity>(); | ||
| _builder.AddController<SecondTestController, V1OperatorIntegrationTestEntity>(); | ||
|
|
||
| _builder.Services | ||
| .Where(s => | ||
| s.ServiceType == typeof(IHostedService) && | ||
| s.ImplementationType == typeof(ResourceWatcher<V1OperatorIntegrationTestEntity>)) | ||
| .Should().HaveCount(1); | ||
| } | ||
|
|
||
| [Fact] | ||
| public void Should_Add_LeaderAwareResourceWatcher() | ||
| { | ||
|
|
@@ -152,6 +213,15 @@ public Task<ReconciliationResult<V1OperatorIntegrationTestEntity>> DeletedAsync( | |
| Task.FromResult(ReconciliationResult<V1OperatorIntegrationTestEntity>.Success(entity)); | ||
| } | ||
|
|
||
| private sealed class SecondTestController : IEntityController<V1OperatorIntegrationTestEntity> | ||
| { | ||
| public Task<ReconciliationResult<V1OperatorIntegrationTestEntity>> ReconcileAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) => | ||
| Task.FromResult(ReconciliationResult<V1OperatorIntegrationTestEntity>.Success(entity)); | ||
|
|
||
| public Task<ReconciliationResult<V1OperatorIntegrationTestEntity>> DeletedAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) => | ||
| Task.FromResult(ReconciliationResult<V1OperatorIntegrationTestEntity>.Success(entity)); | ||
| } | ||
|
|
||
| private sealed class TestFinalizer : IEntityFinalizer<V1OperatorIntegrationTestEntity> | ||
| { | ||
| public Task<ReconciliationResult<V1OperatorIntegrationTestEntity>> FinalizeAsync(V1OperatorIntegrationTestEntity entity, CancellationToken cancellationToken) => | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.