Skip to content

akshit-git24/Vesper-Hivemind

Repository files navigation

Vesper Hivemind - Distributed Job Queue System (V2)

A production-ready, distributed job queue system built in Go, inspired by Celery. V2 delivers enterprise-grade features including priority-based processing, burst job handling, and multi-machine coordination.

🚀 Features (V2)

Core Job Processing

  • Multi-worker Processing: Configurable concurrent workers per machine
  • Dual Database Support: PostgreSQL for production, SQLite for development
  • Lease-based Recovery: Automatic handling of worker failures
  • Retry Handling: Smart retry logic with exponential backoff
  • Idempotency Support: Duplicate job prevention with custom keys

Priority & Performance

  • 🎯 Priority-based Processing: 4-level priority system (0-10 scale)
    • Critical (10): Fraud detection, urgent alerts
    • High (7): User-facing operations, notifications
    • Normal (3): Regular business operations
    • Background (0): Cleanup, backups, maintenance
  • 🚀 Burst Job Handling: Handle 10k+ concurrent job submissions
  • ⚡ High Throughput: Process 100+ jobs/second per machine

Distributed Architecture

  • 🌐 Multi-Machine Coordination: Automatic worker discovery across machines
  • 💓 Heartbeat System: Real-time health monitoring and dead worker cleanup
  • 🔄 Dynamic Fleet Management: Workers join/leave without downtime
  • 📊 Fleet Visibility: Real-time monitoring via /workers API
  • 🛡️ Graceful Shutdown: Clean process termination with proper cleanup

Production Features

  • 📈 Prometheus Metrics: Built-in observability and monitoring
  • 🔌 HTTP Producer API: RESTful job submission and management
  • 🏥 Health Checks: /health and /ready endpoints
  • 📋 Job Tracking: Complete job lifecycle visibility
  • ⏱️ Performance Logging: Per-job timing and fleet statistics

🚧 Coming Soon

  • 📊 Grafana Dashboards: Complete observability stack

📋 Architecture Overview

graph TB
    subgraph "Vesper (lib) V2 - Distributed"
        subgraph "Machine A"
            A1[HTTP API :8080] --> F[(SQLite Database)]
            F --> C1[Worker 1-5]
        end
        subgraph "Machine B"
            A2[HTTP API :8081] --> F
            F --> C2[Worker 6-10]
        end
        subgraph "Machine C"
            A3[HTTP API :8082] --> F
            F --> C3[Worker 11-15]
        end
        WR[Worker Registry] --> F
    end
    H[User/API] --> A1
    H --> A2
    H --> A3

    style A1 fill:#e1f5fe
    style A2 fill:#e1f5fe
    style A3 fill:#e1f5fe
    style C1 fill:#e8f5e8
    style C2 fill:#e8f5e8
    style C3 fill:#e8f5e8
    style F fill:#fff3e0
    style WR fill:#f3e5f5
Loading
image

🔄 Job Workflow

sequenceDiagram
    participant P as Producer
    participant W as Worker
    participant DB as Database

    Note over P,DB: Job Submission Flow
    P->>DB: 1. Create job (status: pending)

    Note over DB: Job Processing Flow
    W->>DB: 2. Atomically claim job (pending -> processing)
    W->>W: 3. Execute job logic

    alt Job Success
        W->>DB: 4a. Update status (done)
    else Job Failure
        W->>DB: 4b. Update status (failed)
    end
Loading

🏗️ Job Lifecycle States

stateDiagram-v2
    [*] --> pending: Job Created
    pending --> processing: Worker Claims Job
    processing --> done: Job Completed Successfully
    processing --> failed: Job Failed/Error
    done --> [*]
    failed --> [*]

    note right of pending: Job waiting in queue
    note right of processing: Worker executing job
    note right of done: Job completed successfully
    note right of failed: Job failed with error
Loading

🛠️ Installation & Setup

Prerequisites

  • Go 1.19 or higher
  • Database: PostgreSQL (production) or SQLite (development)

Install Library

go get github.com/akshit-git24/vesper-hivemind/vesper/v2@v2.1.0

Run the Example App

The repo root includes a minimal main.go that registers a consumer and calls �esper.Run().

Local Development Against ./vesper

For local development, create a workspace file so the root app uses the local ./vesper module instead of the published release:

cp go.work.example go.work

go.work is intentionally gitignored, so GitHub CI and production builds stay independent and resolve the released vesper/v2 version from go.mod.

  1. Clone the repository

    git clone <repository-url>
    cd main-hivemind
  2. Install dependencies

    go mod tidy
  3. Configure environment

    cp .env.example .env
    # Edit .env with your settings
  4. Run the app

    go run .

✍️ Add Your Business Logic

Register a consumer function in your own main.go and the library will call it for every job:

package main

import (
	"github.com/akshit-git24/vesper-hivemind/vesper/v2"
	"github.com/joho/godotenv"
)

func main() {
	godotenv.Load()
	vesper.RegisterConsumer(func(job vesper.Job) error {
		// your business logic here
		return nil
	})
	vesper.Run()
}

⚙️ Configuration

Environment Variables

# Database Configuration
DB_HOST="localhost"         # PostgreSQL host (if provided, uses PostgreSQL)
DB_PORT="5432"             # PostgreSQL port
DB_USER="vesper"           # PostgreSQL username
DB_PASSWORD="secure123"    # PostgreSQL password
DB_NAME="vesper"           # PostgreSQL database name
DB_SSLMODE="disable"       # PostgreSQL SSL mode
DB_TIMEZONE="UTC"          # PostgreSQL timezone

# SQLite Fallback (used when DB_HOST is not provided)
DB_PATH="sqlite.db"        # SQLite database file path

# Application Configuration
WORKERS="5"                # number of concurrent workers
API_ADDR=":8080"          # http server address
APP_ENV="development"      # or "production"
REQUIRE_CONSUMER="0"       # set "1" to require a registered consumer at startup

# HTTP Timeouts
HTTP_READ_HEADER_TIMEOUT="5s"   # http read header timeout
HTTP_READ_TIMEOUT="10s"         # http read timeout
HTTP_WRITE_TIMEOUT="10s"        # http write timeout
HTTP_IDLE_TIMEOUT="60s"         # http idle timeout

Environment behavior:

APP_ENV REQUIRE_CONSUMER Result
production anything Require consumer
development 1 Require consumer
development 0 / empty Allow startup

Database Selection Logic

Configuration Database Used Use Case
DB_HOST provided PostgreSQL Production, multi-machine deployment
DB_HOST not set SQLite Development, single-machine testing

Current Defaults (in code)

  • Workers: 5
  • PostgreSQL: Auto-detected when DB_HOST is provided
  • SQLite fallback: Vesper_sqlite.db

🧪 Load Testing & Performance

Vesper includes production-ready load testing scripts to validate burst performance:

Burst Testing Scripts

# Standard burst test (100 jobs)
.\test-burst.ps1

# High-load test (1000 jobs, 25 concurrent)
.\test-burst.ps1 -JobCount 1000 -ThrottleLimit 25

# Extreme load test (10,000 jobs)
.\test-10k-burst.ps1

# Custom test with detailed output
.\test-10k-burst.ps1 -JobCount 5000 -DetailedOutput

Performance Benchmarks

  • Throughput: 100+ jobs/second per machine
  • Burst Capacity: Handle 10k+ concurrent submissions
  • Latency: Sub-100ms job submission response times
  • Scalability: Linear scaling with additional machines

Load Test Features

  • Realistic Job Distribution: Mixed priority levels and job types
  • Concurrent Submission: Parallel HTTP requests with throttling
  • Performance Metrics: Throughput, success rates, timing analysis
  • Error Handling: Retry logic with exponential backoff
  • Comprehensive Reporting: Detailed statistics and performance ratings

🌐 Distributed Deployment

Single Machine, Multiple Processes

# Terminal 1
WORKERS=10 API_ADDR=:8080 go run .

# Terminal 2
WORKERS=15 API_ADDR=:8081 go run .

# Terminal 3
WORKERS=5 API_ADDR=:8082 go run .

# Check fleet status
curl http://localhost:8080/workers

Multi-Machine Deployment (Production-Ready)

Automatic PostgreSQL Detection: When DB_HOST is provided, Vesper automatically uses PostgreSQL for true distributed deployment.

# Setup shared PostgreSQL database
docker run -d --name postgres \
  -e POSTGRES_DB=vesper \
  -e POSTGRES_USER=vesper \
  -e POSTGRES_PASSWORD=secure123 \
  -p 5432:5432 postgres:15

# Machine A (192.168.1.100)
DB_HOST=192.168.1.100 DB_USER=vesper DB_PASSWORD=secure123 WORKERS=20 go run .

# Machine B (192.168.1.101)
DB_HOST=192.168.1.100 DB_USER=vesper DB_PASSWORD=secure123 WORKERS=15 go run .

# Machine C (192.168.1.102)
DB_HOST=192.168.1.100 DB_USER=vesper DB_PASSWORD=secure123 WORKERS=10 go run .

# Monitor distributed fleet
curl http://192.168.1.100:8080/workers

Result: 45 total workers across 3 machines processing jobs from shared PostgreSQL database with automatic load balancing and failover.

Fleet Management

# View all workers
curl http://localhost:8080/workers | jq '.'

# Check total capacity
curl http://localhost:8080/workers | jq '.total_workers'

# Monitor by machine
curl http://localhost:8080/workers | jq '.machines'

# Graceful shutdown (Ctrl+C or SIGTERM)
kill -TERM <process_id>

📊 Job Structure

{
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "type": "resize_image",
  "priority": 7,
  "payload": {
    "image_url": "https://example.com/cat.png",
    "width": 640,
    "height": 480
  },
  "status": "pending",
  "retries": 0,
  "error": "",
  "created_at": "2024-01-15T10:30:00Z",
  "updated_at": "2024-01-15T10:30:00Z"
}

Priority Levels

  • 0: Background tasks (default) - cleanup, backups
  • 3: Regular tasks - normal business operations
  • 7: Important tasks - user-facing operations
  • 10: Critical tasks - urgent alerts, fraud detection

Note: Any integer 0-10 is valid. Higher numbers = higher priority.


### Job Status Types
- `pending`: Job created and waiting for processing
- `processing`: Job currently being executed by a worker
- `done`: Job completed successfully
- `failed`: Job failed during execution
- `dlq`: Job moved to dead-letter queue after max retries

## 🔧 Core Components

### Producer (HTTP)
- Accepts `POST /jobs`
- Creates job records in database

### Consumer (Workers)
- Multiple concurrent workers (configurable)
- Pulls jobs from DB with atomic claim + lease
- Updates job status during processing
- Executes job logic with error handling

### Database Layer
- **PostgreSQL**: Production-grade database with optimized indexes and JSONB support
- **SQLite**: Development fallback with automatic detection
- **GORM ORM**: Database operations with automatic schema migration
- **Job status tracking**: Complete lifecycle management
- **Worker registration**: Distributed heartbeat and coordination system

## 🚦 API Usage

### Example Usage

```bash
# Background task (priority 0 - default)
curl -X POST http://localhost:8080/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "type": "cleanup_temp_files",
    "priority": 0,
    "payload": {"directory": "/tmp"}
  }'

# Regular task (priority 3)
curl -X POST http://localhost:8080/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "type": "resize_image",
    "priority": 3,
    "payload": {"image_url": "cat.png", "width": 640}
  }'

# Important task (priority 7)
curl -X POST http://localhost:8080/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "type": "send_welcome_email",
    "priority": 7,
    "payload": {"user_id": 123, "email": "user@example.com"}
  }'

# Critical task (priority 10)
curl -X POST http://localhost:8080/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "type": "fraud_alert",
    "priority": 10,
    "payload": {"transaction_id": "tx_456", "risk_score": 0.95}
  }'

Processing Order: Critical tasks (10) → Important tasks (7) → Regular tasks (3) → Background tasks (0)

Within the same priority level, jobs are processed FIFO (first-in, first-out).

API Endpoints

POST /jobs Header: Idempotency-Key: <your-key>

{
  "type": "resize",
  "priority": 7,
  "payload": {
    "image_url": "https://example.com/cat.png",
    "width": 640,
    "height": 480
  }
}

Response: 201 Created with the created job JSON.

GET /jobs/{id} Returns 200 OK with the job JSON or 404 Not Found.

GET /workers Returns JSON with all active workers across machines and fleet statistics.

GET /health Returns 200 OK.

GET /ready Returns 200 OK if DB is reachable.

GET /metrics Returns Prometheus text format for scraping.

GET /stats Returns JSON counters and queue stats.

📈 Prometheus

Vesper exposes Prometheus metrics at GET /metrics. The repo ships with a ready-to-use prometheus.yml for a local Vesper process on :8080.

Normal usage:

  1. Start the app with API_ADDR=":8080".

  2. Keep the repo root prometheus.yml as:

    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    
    scrape_configs:
      - job_name: vesper
        metrics_path: /metrics
        static_configs:
          - targets:
              - localhost:8080
  3. Run Prometheus from the repo root:

    prometheus --config.file=prometheus.yml
  4. Open http://localhost:9090/targets and confirm the vesper target is UP.

Docker container usage for Prometheus:

  1. Keep the Vesper app reachable on the host at :8080.

  2. Change the scrape target in prometheus.yml to:

    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    
    scrape_configs:
      - job_name: vesper
        metrics_path: /metrics
        static_configs:
          - targets:
              - host.docker.internal:8080
  3. Start Prometheus in Docker and mount the config file:

    docker run --rm -p 9090:9090 -v "${PWD}/prometheus.yml:/etc/prometheus/prometheus.yml" prom/prometheus
  4. Open http://localhost:9090/targets and confirm the vesper target is UP.

localhost:8080 works when Prometheus runs directly on your machine. host.docker.internal:8080 is the safer choice when Prometheus runs in a container on Windows or macOS.

✅ Testing & CI

Run tests locally:

cd vesper
go test ./...

CI:

  • GitHub Actions runs go test ./... and go vet ./... inside vesper.
  • Workflow file: .github/workflows/ci.yml

🚧 Roadmap & Status

Completed Features (V2.1)

  • Priority-based Job Processing: 4-level priority system with FIFO within levels
  • Burst Job Handling: Handle 10k+ concurrent job submissions
  • Distributed Architecture: Multi-machine worker coordination
  • Fleet Management: Dynamic worker registration and heartbeat monitoring
  • Load Testing Suite: Production-ready performance validation scripts
  • Comprehensive API: Job submission, retrieval, worker monitoring
  • Production Monitoring: Prometheus metrics and health checks

🚧 Next Milestones (V2.2)

  • 🐘 PostgreSQL Support: Production-grade database backend for true distributed deployment
  • 📊 Grafana Dashboards: Complete observability stack with visual monitoring

🎯 Future Enhancements

  • 🔄 Job Scheduling: Cron-like delayed job execution
  • 📦 Batch Operations: Atomic multi-job transactions
  • 🔐 Authentication: API security and access control
  • 🌍 Multi-Region: Cross-datacenter job distribution

📈 Current Maturity: 9.0/10

Production-ready for most use cases. PostgreSQL + Grafana will achieve 9.5/10 enterprise-grade status.

🤝 Contributing

This is a learning project focused on understanding distributed systems concepts. Feel free to:

  • Report issues or bugs
  • Suggest improvements for V2
  • Submit pull requests for bug fixes
  • Share feedback on architecture decisions

📝 License

Apache-2.0 License - see LICENSE file for details.

About

Distributed job queue system in Go with concurrent worker pools, retry handling, and fault-tolerant processing.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages