A lightweight, high-performance in-memory event bus for .NET 10+ applications. Enables decoupled communication through the publish-subscribe pattern with built-in support for scheduled events. Perfect for domain events, CQRS, and event-driven architectures within a single application.
- Features
- Installation
- Quick Start
- Configuration
- Usage Examples
- Processing Modes
- Advanced Scenarios
- Best Practices
- OpenTelemetry Integration
- Troubleshooting
- Contributing
- ⚡ High Performance: Built on
System.Threading.Channelsfor optimal throughput - 🔄 Async First: Non-blocking event publishing and processing
- 📅 Event Scheduling: Schedule events to be processed at specific times
- 🛡️ Type-Safe: Strongly-typed contracts with compile-time safety
- 🔧 Flexible: Extensible architecture with custom processors
- 🧵 Concurrency Control: Built-in capacity management
- 📝 Rich Logging: Comprehensive logging for monitoring
- 🧩 DI Native: First-class dependency injection support
- 🎯 Request-Response: Support for query patterns via
InvokeAsync
dotnet add package Softoverse.EventBus.InMemoryusing Softoverse.EventBus.InMemory.Abstractions;
public class OrderCreatedEvent : IEvent
{
public string OrderId { get; set; }
public decimal Amount { get; set; }
public string CustomerId { get; set; }
}public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedHandler> _logger;
public OrderCreatedHandler(ILogger<OrderCreatedHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct = default)
{
_logger.LogInformation("Processing order {OrderId}", @event.OrderId);
// Your business logic here
await Task.CompletedTask;
}
}using Softoverse.EventBus.InMemory;
using Softoverse.EventBus.InMemory.Infrastructure.Processors;
var builder = WebApplication.CreateBuilder(args);
// Register with InMemoryEventProcessor (recommended for most cases)
builder.Services.AddEventBus(
builder.Configuration,
[typeof(Program).Assembly] // Assemblies containing your handlers
);
var app = builder.Build();
app.Run();{
"EventBusSettings": {
"EventProcessorCapacity": 10
}
}public class OrderService
{
private readonly IEventBus _eventBus;
public OrderService(IEventBus eventBus)
{
_eventBus = eventBus;
}
public async Task CreateOrderAsync(CreateOrderRequest request)
{
// Your order creation logic...
// Publish event
await _eventBus.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
Amount = order.Amount,
CustomerId = order.CustomerId
});
}
}| Property | Description | Default |
|---|---|---|
EventProcessorCapacity |
Max concurrent event processors | 10 |
ChannelCapacity |
Channel buffer size (-1 = unbounded) | -1 |
ExecuteAfterSeconds |
Interval for checking scheduled events | 2 |
RetryCount |
Maximum retry attempts (for custom processors) | 10 |
EachRetryInterval |
Seconds between retries | 3 |
{
"EventBusSettings": {
"EventProcessorCapacity": 10,
"ChannelCapacity": -1,
"ExecuteAfterSeconds": 2,
"RetryCount": 10,
"EachRetryInterval": 3
}
}await _eventBus.PublishAsync(new OrderCreatedEvent { OrderId = "123" });var events = new List<OrderCreatedEvent>
{
new() { OrderId = "123" },
new() { OrderId = "124" },
new() { OrderId = "125" }
};
await _eventBus.BulkPublishAsync(events);var reminderDate = DateTimeOffset.UtcNow.AddDays(7);
await _eventBus.ScheduleAsync(
new SubscriptionReminderEvent { SubscriptionId = "sub-123" },
reminderDate
);var reminders = new List<SubscriptionReminderEvent>
{
new() { SubscriptionId = "sub-123", DaysRemaining = 7 },
new() { SubscriptionId = "sub-124", DaysRemaining = 7 }
};
var scheduleTime = DateTimeOffset.UtcNow.AddDays(7);
await _eventBus.BulkScheduleAsync(reminders, scheduleTime);The EventBus includes comprehensive status tracking for scheduled events to prevent duplicate execution and provide visibility into event processing:
- Pending: Event is waiting to be processed
- InProgress: Event is currently being processed
- Done: Event completed successfully
- Failed: Event processing failed (with failure reason)
- Skipped: Event was skipped and won't be processed
The system automatically manages event statuses to prevent duplicate execution:
- When an event becomes due, it's immediately marked as InProgress
- If processing succeeds, it's marked as Done and removed from the store
- If processing fails, it's marked as Failed with the error message and removed
- Events stuck in InProgress for too long (default: 5 minutes) are considered stale and can be retried
You can control how long an event stays in InProgress state before being considered stale:
// In your startup/configuration
builder.Services.AddSingleton<ScheduledEventStore>(sp =>
{
var logger = sp.GetRequiredService<ILogger<ScheduledEventStore>>();
var store = new ScheduledEventStore(logger)
{
InProgressTimeoutMinutes = 10 // Default is 5 minutes
};
return store;
});You can query events by status for monitoring or debugging:
// Get the ScheduledEventStore from DI
var store = serviceProvider.GetRequiredService<ScheduledEventStore>();
// Get all pending events
var pendingEvents = store.GetEventsByStatus(ScheduledEventStatus.Pending);
// Get all failed events to investigate issues
var failedEvents = store.GetEventsByStatus(ScheduledEventStatus.Failed);
foreach (var entry in failedEvents)
{
Console.WriteLine($"Event {entry.Id} failed: {entry.Remarks}");
}
// Get all in-progress events
var inProgressEvents = store.GetEventsByStatus(ScheduledEventStatus.InProgress);
// Manually mark an event as skipped
store.MarkAsSkipped(eventId, "Cancelled by user");
// Get total count of scheduled events
int totalEvents = store.Count;
// Get next scheduled event time
DateTimeOffset? nextEventTime = store.GetNextScheduledTime();Problem: When debugging handlers (e.g., using breakpoints), the background service continues polling and may pick up the same event multiple times, causing duplicate execution.
Solution: Status tracking ensures:
- Events marked as InProgress won't be picked up again (unless they time out)
- Failed events are tracked with failure reasons for troubleshooting
- You can query and monitor the state of all scheduled events
- Prevents race conditions in distributed scenarios
public class ScheduledEventMonitor : BackgroundService
{
private readonly ScheduledEventStore _store;
private readonly ILogger<ScheduledEventMonitor> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var pending = _store.GetEventsByStatus(ScheduledEventStatus.Pending);
var inProgress = _store.GetEventsByStatus(ScheduledEventStatus.InProgress);
var failed = _store.GetEventsByStatus(ScheduledEventStatus.Failed);
_logger.LogInformation(
"Scheduled Events - Pending: {Pending}, InProgress: {InProgress}, Failed: {Failed}",
pending.Count, inProgress.Count, failed.Count);
// Alert if events are stuck in InProgress for too long
foreach (var evt in inProgress)
{
var stuckDuration = DateTimeOffset.UtcNow - evt.StatusUpdatedAt;
if (stuckDuration > TimeSpan.FromMinutes(15))
{
_logger.LogWarning(
"Event {EventId} has been InProgress for {Duration} minutes",
evt.Id, stuckDuration.TotalMinutes);
}
}
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
}For queries that need responses:
// 1. Define a request handler
public class GetOrderQueryHandler : IRequestHandler<GetOrderQuery, OrderDto>
{
public async Task<OrderDto> HandleAsync(GetOrderQuery query, CancellationToken ct)
{
// Fetch and return order
return await _repository.GetOrderAsync(query.OrderId);
}
}
// 2. Use InvokeAsync
var result = await _eventBus.InvokeAsync<OrderDto>(new GetOrderQuery
{
OrderId = "123"
});// Handler 1: Send email
public class OrderEmailHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _emailService.SendOrderConfirmationAsync(@event.CustomerId);
}
}
// Handler 2: Update inventory
public class OrderInventoryHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _inventoryService.ReserveItemsAsync(@event.OrderId);
}
}
// Handler 3: Log audit
public class OrderAuditHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _auditService.LogOrderCreatedAsync(@event);
}
}
// All three handlers execute in parallel when the event is publishedThe EventBus uses a Channel-based architecture for high-performance asynchronous processing with background workers.
Benefits:
- Non-blocking publishers (immediate return)
- High throughput via background processing
- Built-in concurrency control
- Automatic scheduling support
Characteristics:
- High event volume support
- Optimized for performance
- Eventual consistency model
If you need custom behavior (retry logic, circuit breakers, etc.), create a custom processor:
public class CustomEventProcessor : IEventProcessor
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger _logger;
public CustomEventProcessor(
IServiceScopeFactory scopeFactory,
ILogger<CustomEventProcessor> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
public async Task ProcessEventAsync(IEvent @event, CancellationToken ct)
{
// Custom pre-processing logic
_logger.LogInformation("Custom processing for {EventType}", @event.GetType().Name);
await ProcessEventHandlersAsync(@event, ct);
}
public async Task ProcessScheduledEventAsync(
IEvent @event,
DateTimeOffset scheduledTime,
CancellationToken ct)
{
// Calculate delay
var delay = scheduledTime.ToUniversalTime() - DateTimeOffset.UtcNow;
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay, ct);
}
await ProcessEventHandlersAsync(@event, ct);
}
public async Task ProcessEventHandlersAsync(IEvent @event, CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var handlers = scope.ServiceProvider.GetServices<IEventHandler>();
var applicableHandlers = handlers.Where(h => h.CanHandle(@event));
var tasks = applicableHandlers.Select(h =>
SafeHandleAsync(h, @event, ct));
await Task.WhenAll(tasks);
}
public async Task<TResult> InvokeAsync<TResult>(
object @event,
CancellationToken ct)
{
// Custom invoke logic
await using var scope = _scopeFactory.CreateAsyncScope();
var handlerType = typeof(IRequestHandler<,>)
.MakeGenericType(@event.GetType(), typeof(TResult));
var handler = scope.ServiceProvider.GetService(handlerType);
if (handler is IRequestHandler baseHandler)
{
var result = await baseHandler.HandleAsync(@event, ct);
return (TResult)result!;
}
return default!;
}
private async Task SafeHandleAsync(
IEventHandler handler,
IEvent @event,
CancellationToken ct)
{
try
{
await handler.HandleAsync(@event, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Handler {Handler} failed", handler.GetType().Name);
// Don't rethrow - isolate failures
}
}
}Register your custom processor:
builder.Services.AddEventBus<CustomEventProcessor>(
builder.Configuration,
[typeof(Program).Assembly]
);public class OrderHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IOrderRepository _repository;
private readonly IEmailService _emailService;
private readonly ILogger _logger;
public OrderHandler(
IOrderRepository repository,
IEmailService emailService,
ILogger<OrderHandler> logger)
{
_repository = repository;
_emailService = emailService;
_logger = logger;
}
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
var order = await _repository.GetByIdAsync(@event.OrderId);
if (order != null)
{
await _emailService.SendConfirmationAsync(order.CustomerId, order);
_logger.LogInformation("Order confirmation sent for {OrderId}", order.Id);
}
}
}Handlers can implement conditional logic:
public class PremiumCustomerHandler : IEventHandler<OrderCreatedEvent>
{
private readonly ICustomerService _customerService;
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
var customer = await _customerService.GetByIdAsync(@event.CustomerId);
// Only process for premium customers
if (customer.IsPremium)
{
// Apply special premium processing
await ApplyPremiumBenefitsAsync(customer, @event);
}
}
}Each handler should have a single responsibility:
// ✅ Good - Single responsibility
public class SendOrderEmailHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _emailService.SendOrderConfirmationAsync(@event);
}
}
// ❌ Bad - Multiple responsibilities
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
await _emailService.SendEmail(@event);
await _inventoryService.UpdateInventory(@event);
await _analyticsService.TrackOrder(@event);
await _loyaltyService.AddPoints(@event);
}
}For production workloads, use Channel mode for better performance:
{
"EventBusSettings": {
"EventBusType": "Channel",
"EventProcessorCapacity": 10
}
}Don't let one handler failure affect others:
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
try
{
await _service.ProcessOrderAsync(@event);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process order {OrderId}", @event.OrderId);
// Consider: Dead letter queue, retry logic, alerting, etc.
}
}// ✅ Good
public class OrderCreatedEvent : IEvent { }
public class PaymentProcessedEvent : IEvent { }
public class InventoryReservedEvent : IEvent { }
// ❌ Bad
public class Event1 : IEvent { }
public class DataChanged : IEvent { }Use scheduling for time-based workflows:
// Schedule reminder 7 days before expiry
var reminderDate = subscription.ExpiryDate.AddDays(-7);
await _eventBus.ScheduleAsync(
new SubscriptionExpiryReminderEvent { SubscriptionId = subscription.Id },
reminderDate
);Test with General mode for easier debugging, then use Channel mode for production:
// appsettings.Development.json
{
"EventBusSettings": {
"EventBusType": "General"
}
}
// appsettings.Production.json
{
"EventBusSettings": {
"EventBusType": "Channel",
"EventProcessorCapacity": 20
}
}Add structured logging to track event flow:
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
using var scope = _logger.BeginScope(new Dictionary<string, object>
{
["OrderId"] = @event.OrderId,
["EventType"] = nameof(OrderCreatedEvent)
});
_logger.LogInformation("Processing order creation");
await ProcessOrderAsync(@event);
_logger.LogInformation("Order processing completed");
}The EventBus library includes built-in support for distributed tracing using OpenTelemetry. This allows you to trace event publishing, processing, and handler execution in observability platforms like Seq, Jaeger, Zipkin, or Application Insights.
📚 For detailed documentation, examples, and troubleshooting, see OpenTelemetry Integration Guide
💡 Note: If you see detailed trace output in your console (Activity.TraceId, Activity.SpanId, etc.), this is completely normal when using
.AddConsoleExporter()- it's intended for development/debugging and shows that tracing is working correctly! For production, use a proper APM exporter instead.
The library exposes an ActivitySource named "Softoverse.EventBus.InMemory" that creates traces for all event bus operations.
# Core OpenTelemetry packages
dotnet add package OpenTelemetry.Extensions.Hosting
dotnet add package OpenTelemetry.Instrumentation.AspNetCore
# Choose your exporter (examples below)
dotnet add package OpenTelemetry.Exporter.Console # For console output
dotnet add package OpenTelemetry.Exporter.OpenTelemetryProtocol # For OTLP (Seq, etc.)
dotnet add package OpenTelemetry.Exporter.Zipkin # For Zipkin
dotnet add package OpenTelemetry.Exporter.Jaeger # For Jaegerusing OpenTelemetry.Resources;
using OpenTelemetry.Trace;
var builder = WebApplication.CreateBuilder(args);
// Add EventBus
builder.Services.AddEventBus(builder.Configuration, [typeof(Program).Assembly]);
// Configure OpenTelemetry
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing =>
{
tracing
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
// 👇 Add EventBus tracing
.AddSource("Softoverse.EventBus.InMemory");
// Console exporter for development (produces detailed output in console)
if (builder.Environment.IsDevelopment())
{
tracing.AddConsoleExporter();
}
// Use your preferred production exporter (Seq, Jaeger, Application Insights, etc.)
});
var app = builder.Build();
app.Run();// Install: dotnet add package OpenTelemetry.Exporter.OpenTelemetryProtocol
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddSource("Softoverse.EventBus.InMemory")
.AddOtlpExporter(options =>
{
options.Endpoint = new Uri("http://localhost:5341/ingest/otlp/v1/traces");
options.Protocol = OpenTelemetry.Exporter.OtlpExportProtocol.HttpProtobuf;
})
);Seq Configuration: In Seq, go to Settings → API Keys and create an OTLP ingestion key.
// Install: dotnet add package OpenTelemetry.Exporter.Jaeger
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddSource("Softoverse.EventBus.InMemory")
.AddJaegerExporter(options =>
{
options.AgentHost = "localhost";
options.AgentPort = 6831;
})
);// Install: dotnet add package Azure.Monitor.OpenTelemetry.Exporter
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("MyApplication"))
.WithTracing(tracing => tracing
.AddAspNetCoreInstrumentation()
.AddSource("Softoverse.EventBus.InMemory")
.AddAzureMonitorTraceExporter(options =>
{
options.ConnectionString = builder.Configuration["ApplicationInsights:ConnectionString"];
})
);The EventBus creates traces for the following operations:
| Activity Name | Description | Tags |
|---|---|---|
EventBus.Publish |
Single event publication | eventbus.event.type, eventbus.event.id |
EventBus.BulkPublish |
Bulk event publication | eventbus.event.type, eventbus.event.count |
EventBus.Schedule |
Schedule single event | eventbus.event.type, eventbus.scheduled_time |
EventBus.BulkSchedule |
Schedule multiple events | eventbus.event.type, eventbus.event.count, eventbus.scheduled_time |
EventBus.Invoke |
Request-response invocation | eventbus.event.type, eventbus.result.type |
EventBus.ProcessEvent |
Event processing | eventbus.event.type, eventbus.processing.status |
EventBus.HandleEvent |
Individual handler execution | eventbus.event.type, eventbus.handler.type |
EventBus.Channel.Read |
Reading from channel | eventbus.channel.type (Publishing/Scheduling) |
EventBus.Channel.Process |
Channel processing | eventbus.channel.type, eventbus.processing.status |
EventBus.ScheduledEvent.Check |
Scheduled event check | eventbus.event.count, eventbus.processed.count |
Each trace includes relevant tags for filtering and analysis:
- eventbus.event.type: The name of the event type
- eventbus.event.id: The unique identifier of the event (if available)
- eventbus.event.count: Number of events in bulk operations
- eventbus.handler.type: The name of the handler processing the event
- eventbus.scheduled_time: When the event is scheduled to execute
- eventbus.processing.status: Status of the operation (
success,failed,no_handlers, etc.) - eventbus.error.type: Type of exception when operation fails
- eventbus.channel.type: Type of channel (
PublishingorScheduling)
After configuring Seq exporter:
- Publish an event in your application
- Open Seq at
http://localhost:5341 - Navigate to the Traces view
- Search for traces containing
EventBus.Publish - Click on a trace to see the full span hierarchy:
EventBus.Publish (OrderCreatedEvent)
└── EventBus.ProcessEvent
├── EventBus.HandleEvent (SendOrderEmailHandler)
├── EventBus.HandleEvent (OrderInventoryHandler)
└── EventBus.HandleEvent (OrderAuditHandler)
You can add custom spans in your handlers:
using System.Diagnostics;
public class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
private static readonly ActivitySource ActivitySource = new("MyApplication.Handlers");
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
using var activity = ActivitySource.StartActivity("ProcessOrder");
activity?.SetTag("order.id", @event.OrderId);
activity?.SetTag("order.amount", @event.Amount);
// Your processing logic
await ProcessOrderAsync(@event);
activity?.SetStatus(ActivityStatusCode.Ok);
}
}Remember to register your custom ActivitySource:
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("Softoverse.EventBus.InMemory")
.AddSource("MyApplication.Handlers") // 👈 Add your custom source
.AddOtlpExporter(/* ... */)
);Use trace tags to filter and analyze specific scenarios:
In Seq:
@Properties.eventbus.event.type = 'OrderCreatedEvent'
@Properties.eventbus.processing.status = 'failed'
In Jaeger:
- Filter by tag:
eventbus.event.type=OrderCreatedEvent - Filter by operation:
EventBus.HandleEvent
- OpenTelemetry tracing has minimal performance impact (<1% in most scenarios)
- Activities are only created when an
ActivityListeneris registered - Activities are only created for actual event processing - idle operations (empty channel reads, scheduled event checks with no due events) don't create traces to avoid noise
- Use sampling in high-throughput scenarios to reduce overhead:
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("Softoverse.EventBus.InMemory")
.SetSampler(new TraceIdRatioBasedSampler(0.1)) // Sample 10% of traces
.AddOtlpExporter()
);- Check Configuration: Ensure
EventBusSettingsis inappsettings.json - Verify Registration: Handlers must be in assemblies passed to
AddEventBus() - Check Logs: Look for errors in application logs
- Verify InMemoryEventProcessor: Use the parameterless
AddEventBus()overload - Check Time: Ensure scheduled time is in the future
- Review Logs: Check
ScheduledEventProcessingHostedServicelogs - Check Status: Query
ScheduledEventStore.GetEventsByStatus()to see if events are stuck in InProgress or Failed state
- Status Tracking: The system automatically marks events as InProgress to prevent duplicate execution
- Timeout Configuration: If events take longer to process, increase
InProgressTimeoutMinutesonScheduledEventStore - Check for Errors: Review failed events using
GetEventsByStatus(ScheduledEventStatus.Failed)to identify issues
- Increase Capacity: Adjust
EventProcessorCapacitybased on load - Use Channel Mode: Ensure using "Channel" mode for production
- Optimize Handlers: Profile slow handlers and optimize them
- Architecture Documentation: See ARCHITECTURE.md for detailed implementation details
- GitHub Repository: https://github.com/softoverse/EventBus
- NuGet Package: https://www.nuget.org/packages/Softoverse.EventBus.InMemory/
Contributions are welcome! Please see ARCHITECTURE.md to understand the codebase, then:
- Fork the repository
- Create a feature branch
- Make your changes
- Submit a pull request
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
Made with ❤️ by Softoverse