Skip to content

Could somebody please explain to me or give me examples on how "Retry" should be set up with the resilience pipeline builder? #3959

Description

@MaxwellAllen216

Hi,

I've been at this issue for at least 4 hours. I just want my asynchronous command handler to use the UseResiliencePipelineAsync attribute.

The issue I am having is that I get an error indicating the pipeline needs to be generic, most likely the command model needs to be passed in the generic, but I have been getting several issues.

The Goal:

  • Use SQL Server for Inbox, Outbox and as the message broker/queue
  • Set up resilience pipelines for asynchronous handlers

Issues:

  • If I try and use the synchronous UseResiliencePipeline attribute, I get an error indicating I cannot use it inside an asynchronous handler

  • If I try adding a non-generic pipeline, I get a "KeyNotFound" error indicating it tried to look for the pipeline with a generic.

  • If I add the generics, I still get the same "KeyNotFound" error. This is most likely because I had switched to using the fluent.brighter package, as it did happen to work the regular way. But I would like to use the fluent package if it's possible to reach my goal with it.

KeyNotFoundException:

Paramore.Brighter.ConfigurationException: Error when building pipeline, see inner Exception for details
 ---> System.Collections.Generic.KeyNotFoundException: Unable to find a generic resilience pipeline of
'UpsertUserCommand' associated with the key 'DatabaseRetry'. Please ensure that either the generic resilience
pipeline or the generic builder is registered.

What I currently have:

       builder.Services
           .AddSingleton<IAmARelationalDatabaseConfiguration>(databaseConfig)
           .AddFluentBrighter(options =>
           {
               options.UseOutboxSweeper(options =>
               {
                   options.SetTimerInterval(5);
                   options.SetBatchSize(100);
                   options.SetMinimumMessageAge(TimeSpan.FromMilliseconds(500));
               });

               options.RequestHandlers(options =>
               {
                   options.FromAssemblies(builder.GetAllAssemblies());
               });

               options.RegisterServices(services =>
               {
                   services.AddTransient(typeof(ValidationHandler<>));
               });

               options.Producers(options =>
               {
                   options.UseMicrosoftSqlServerDistributedLock(databaseConfig);
                   options.UseMicrosoftSqlServerOutbox(databaseConfig);

                   options.AddMicrosoftSqlServerPublication(options =>
                   {
                       options.SetConnection(databaseConfig);

                       options.AddPublication<UpsertUserCommand>(options =>
                       {
                           options.SetQueue("upsert-user");
                           options.SetMakeChannels(OnMissingChannel.Create);
                       });
                   });
               });

               options.Subscriptions(options =>
               {
                   options.UseHandlersAsScoped();
                   options.AddMicrosoftSqlServerChannelFactory(databaseConfig);
                   options.UseMicrosoftSqlServerInbox(databaseConfig);

                   options.AddMicrosoftSqlServerSubscription<UpsertUserCommand>(options =>
                   {
                       options.SetSubscription("upsert-user");
                       options.SetQueue("upsert-user");
                       options.SetMakeChannels(OnMissingChannel.Create);
                       options.SetUnacceptableMessageLimit(5);
                       options.SetMessagePumpType(MessagePumpType.Proactor);
                   });

                   options.AddResiliencePipeline<UpsertUserCommand>("DatabaseRetry", (builder, context) =>
                   {
                       builder.AddRetry(new RetryStrategyOptions // just to note, with the fluent package, AddRetry is not generic
                       {
                           ShouldHandle = new PredicateBuilder()
                               .Handle<DbUpdateException>()
                               .Handle<SqlException>(),
                           MaxRetryAttempts = 3,
                           Delay = TimeSpan.FromSeconds(1),
                           BackoffType = DelayBackoffType.Exponential
                       });
                   });
               });

           });

       return builder;
   }

What am I doing wrong? What is the correct/latest way to add the pipeline?

Thanks for any help!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions