Notification service (MVP): send emails (Resend) and SMS (Twilio) via gRPC, with async processing over RabbitMQ.
- gRPC API – Create/send notifications; enqueue for async send. Channel: EMAIL, SMS (PUSH/IN_APP stubbed).
- Async primary queue – Enqueue notification → RabbitMQ topic
notifications.send(Watermill/AMQP) → consumer sends. Messages are always acked; failed sends incrementattempt_count, set exponential backoff vianext_retry_at, and mark FAILED whenattempt_countreachesmax_attempts. - Retry queue (multi-pod friendly) – A background worker periodically claims due
PENDINGrows in Postgres (FOR UPDATE SKIP LOCKED), extendsnext_retry_atby a short publish lease, and publishes lightweight messages tonotifications.retry. Any pod’s consumer can process retries; idempotency relies on DB state (PENDING/COMPLETED/FAILED). - Email – MJML templates (welcome, verify-otp, forgot-password, reset-password) via Resend.
- SMS – Text templates via Twilio; mock client when Twilio is not configured.
- HTTP – Health/admin endpoints.
- Go 1.25+
- Redis (cache)
- PostgreSQL (notification records)
- RabbitMQ (queue; e.g.
docker-compose up -d rabbitmq) - Resend API key (email)
- Twilio (optional; for SMS)
- Install dependencies:
go mod download- Copy env and set values:
cp .env.example .env
# Set Redis, Postgres, RabbitMQ, Resend; optionally Twilio.- Start RabbitMQ (if using Docker):
docker-compose up -d rabbitmqUse RABBITMQ_URL=amqp://guest:guest@127.0.0.1:5672/ in .env when running the app on your host (avoids IPv6 connection issues).
- Run the app:
go run .- HTTP:
http://localhost:8080(HTTP_HOST,HTTP_PORT) - gRPC:
localhost:9090(GRPC_PORT)
| Group | Variable | Description |
|---|---|---|
| App | APP_NAME, APP_VERSION |
Application identity |
| Server | HTTP_HOST, HTTP_PORT, GRPC_PORT |
Listen addresses |
| Redis | REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB |
Cache |
| Postgres | POSTGRES_* |
Database connection |
| RabbitMQ | RABBITMQ_URL |
AMQP URL. Use 127.0.0.1 on host; rabbitmq in Docker. |
| Notification | NOTIFICATION_MAX_ATTEMPTS |
Max send attempts before status FAILED (default in code: 3). |
| Notification | NOTIFICATION_RETRY_INTERVAL_SEC |
How often the retry scheduler runs (default: 60). |
| Notification | NOTIFICATION_RETRY_BATCH_SIZE |
Max rows claimed per tick (default: 10). |
| Notification | NOTIFICATION_RETRY_BACKOFF_INITIAL_SEC |
Base backoff after a failure in seconds (default: 30). |
| Notification | NOTIFICATION_RETRY_BACKOFF_MAX_SEC |
Backoff cap in seconds (default: 3600). |
| Notification | NOTIFICATION_RETRY_PUBLISH_LEASE_SEC |
After claim, next_retry_at is bumped by this lease so other pods do not double-enqueue the same row until the lease expires or the consumer updates the row (default: 300). |
EMAIL_SENDER, EMAIL_RESEND_API_KEY, EMAIL_TEMPLATE_DIR |
Resend | |
| SMS | TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN, TWILIO_FROM_NUMBER, SMS_TEMPLATE_DIR |
Twilio (optional) |
| Logging | LOG_LEVEL |
e.g. info, debug |
If Twilio is not set, a mock SMS client is used.
.
├── config/ # App config, env loading
├── internal/
│ ├── aggregate/ # DTOs (e.g. SendNotificationReq, enqueue/retry payloads)
│ ├── model/ # Domain models
│ ├── repository/ # Data access (transactions, locking, RecordSendFailure SQL)
│ ├── service/ # Notification logic; publish to AMQP from service layer
│ ├── shared/constant/ # Template maps, event topics
│ └── errorx/ # Error types
├── pkg/
│ ├── email/ # Resend client, MJML renderer
│ ├── sms/ # Twilio + mock, body renderer
│ ├── logger/ # Logger (Zap)
│ ├── cache/ # Redis
│ ├── database/ # Postgres
│ └── validator/
├── presentation/
│ ├── events/ # AMQP: LoggerAdapter, publisher/subscriber, router, consumers
│ ├── worker/ # Retry scheduler ticker (enqueue due rows to retry topic)
│ ├── grpc/ # gRPC server, NotiInternal
│ └── http/ # HTTP server
├── templates/
│ ├── email/ # MJML (.mjml)
│ └── sms/ # Text (.txt)
└── examples/
| Topic | Role |
|---|---|
notifications.send |
Initial async send after enqueue. Payload includes full SendNotificationReq. |
notifications.retry |
Retry attempts for rows that already failed at least once and are still PENDING under max_attempts. Payload is { "notificationId": "..." }; the consumer loads the row from Postgres. |
Constants live in internal/shared/constant/event.go.
- Consumer:
ProcessNotificationFromQueue. - On success:
COMPLETED,sent_atset. - On failure:
RecordSendFailureatomically incrementsattempt_count, may setFAILEDif attempts are exhausted, otherwise setsnext_retry_atusing exponential backoff:min(initial × 2^attempt_count, max)(see repository SQL).
- Scheduler (
presentation/worker/retry.go): on an interval, runsEnqueuePendingRetries, which opens a transaction, selects eligible rows withSKIP LOCKED, setsnext_retry_atto now + publish lease, commits, then publishes one message per row tonotifications.retry. - Retry consumer:
ProcessNotificationRetryFromQueueloads the notification by ID; if it is no longer a valid retry target, the message is acked without sending. Otherwise it sends like the primary path and updates success or callsRecordSendFailureagain.
Messages are always acked so the broker does not block; correctness depends on DB state and the lease/backoff fields.
Ensure RabbitMQ topology binds both topics like your existing notifications.send setup (Watermill durable queue config).
- EMAIL:
WELCOME,VERIFY_OTP,FORGOT_PASSWORD,RESET_PASSWORD→templates/email/(MJML). - SMS: e.g.
VERIFY_OTP→templates/sms/(text). Mapping ininternal/shared/constant/template.go.
go test ./...Some service tests use an in-memory SQLite database to exercise transactional retry claim logic; production uses PostgreSQL.
Private / internal use.