Skip to content

[Bug Report]: Manually committing offsets not working as expected #655

@kevin-mcmanus

Description

@kevin-mcmanus

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

I'm using version 4.0 and would like to control when offsets are written back to the broker.

As a simplified example I would like to process a batch of 1000 messages, and commit the offset of the last message in the batch.

These docs, describe how to turn off the automatic mode, but don't provide guidance on how to define which offset to commit.

I've experimented with various combinations of context.ConsumerContext.Complete() and context.ConsumerContext.ShouldStoreOffset = true but haven't found a combination that works. Please can you advise what I might be missing.

Steps to reproduce

Consumer config:

                    .AddConsumer(consumer => consumer
                        .Topic("sample-topic")
                        .WithName("input")
                        .WithGroupId("service1")
                        .WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Earliest)
                        .WithBufferSize(100)
                        .WithWorkersCount(1)
                        .WithManualMessageCompletion()
                        //.WithoutStoringOffsets()
                        //.WithAutoCommitIntervalMs(0)
                        .AddMiddlewares(middleware =>
                        {
                            middleware.Add<CommitMiddleware>();
                        })
                    )

Middleware

    internal class CommitMiddleware : IMessageMiddleware
    {
        public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
        {
            Console.WriteLine($"Processing offset {context.ConsumerContext.Offset}");

            await next(context);

            context.ConsumerContext.ShouldStoreOffset = false;

            if (context.ConsumerContext.Offset % 1000 == 0)
            {
                context.ConsumerContext.ShouldStoreOffset = true;

                context.ConsumerContext.Complete();
                Console.WriteLine($"Committing offset {context.ConsumerContext.Offset}");
            }

            //context.ConsumerContext.Complete();
        }
    }

Expected behavior

Every all to Complete() will update the offset number stored on the broker for the consumer group.

Actual behavior

A single offset is registered on the broker after the first call to Complete(). The consumer group continues to report the same offset number no matter how many times the middleware calls Complete() again.

KafkaFlow version

4.0.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions