Skip to content

Imbirel/nestjs-microservices

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

🚀 NestJS Microservices: Event-Driven Order Processing with RabbitMQ & Telegram Notifications

NestJS RabbitMQ PostgreSQL Redis Telegram Docker

Надёжная микросервисная архитектура на NestJS с событийно-ориентированным подходом. Принимает заказы через HTTP, асинхронно обрабатывает их и отправляет уведомления в Telegram. Гарантирует доставку событий с помощью Transactional Outbox + CDC (Debezium), фоновые задачи через pg‑boss и rate‑limiting через Redis.

Ключевые решения: Transactional Outbox + CDC, единая outbox‑таблица для событий и команд, многоуровневая идемпотентность (PostgreSQL), Quorum Queues в RabbitMQ, Dead Letter Queue Observer, rate limiting с Lua‑скриптами, пул соединений PgBouncer.

📐 Архитектура

Проект построен как монорепозиторий Nx и состоит из четырёх микросервисов, Debezium‑сервера и общих библиотек.
Главное изменение по сравнению с классическими подходами: нет отдельного outbox‑воркера — вместо него Debezium читает WAL PostgreSQL и напрямую публикует изменения в RabbitMQ.

Сервисы

  • producer-service – HTTP‑контроллер, создаёт заказ и сохраняет событие в outbox‑таблицу в одной транзакции.
  • consumer-service – слушает события заказов, применяет бизнес‑логику и в той же транзакции помещает команду уведомления в ту же outbox‑таблицу.
  • notification-service – получает команды уведомлений, планирует задачу в pg‑boss (singleton по messageId), выполняет отправку в Telegram после получения токена от Redis‑rate‑limiter.
  • dlq-logger-service – подписан на Dead Letter Exchange, сохраняет все недоставленные сообщения в БД.

Хранилища

  • PostgreSQL (один контейнер, три базы):
    • orders_db – заказы, единая outbox_table, таблицы идемпотентности команд.
    • notifications_db – схемы pg‑boss для фоновых задач.
    • dlq_db – журнал мёртвых сообщений.
  • Redis – только для rate limiting (sliding window на Lua).
  • RabbitMQ – два exchange (main topic и dead letter), очереди с x‑delivery‑limit = 5.

Схема взаимодействия сервисов

graph TD
    Client[HTTP Client] -->|POST /orders| Producer[producer-service]
    Producer -->|1. INSERT заказ + outbox-событие + idempotency| OrdersDB[(orders_db)]
    OrdersDB -->|2. CDC outbox_table| Debezium[Debezium Server]
    Debezium -->|3. Публикация событий и команд| RMQ[(RabbitMQ)]
    RMQ -->|4. Событие заказа| Consumer[consumer-service]
    Consumer -->|5. INSERT outbox-команда + idempotency| OrdersDB
    Debezium -->|6. Команда уведомления| RMQ
    RMQ -->|7. Команда уведомления| Notification[notification-service]
    Notification -->|8. Планирование задачи| PgBoss[pg-boss]
    PgBoss -->|9. Хранение задач| NotifDB[(notifications_db)]
    Notification -->|10. Rate limit check| Redis[Redis]
    Notification -->|11. Отправка| Telegram[Telegram API]
    RMQ -->|12. Dead letters| DLQ[dlq-logger-service]
    DLQ -->|13. Сохранение| DlqDB[(dlq_db)]
Loading

Поток данных для одного заказа (sequence)

sequenceDiagram
    participant Client
    participant Producer
    participant PG
    participant Debezium
    participant RMQ
    participant Consumer
    participant Notification
    participant PgBoss
    participant Redis
    participant Telegram

    Client->>Producer: POST /orders (userId, items)
    Producer->>PG: BEGIN\n INSERT orders_table\n INSERT outbox_table (event)\n UPDATE idempotency_commands\n COMMIT
    Producer-->>Client: 200 OK {orderId}
    PG-->>Debezium: WAL flush (event)
    Debezium->>RMQ: Publish to main.topic.exchange\n routing_key=order.event.created
    RMQ->>Consumer: Deliver message (queue orders.process)
    Consumer->>PG: BEGIN\n SELECT idempotency_events (check)\n INSERT idempotency_events\n INSERT outbox_table (command)\n COMMIT
    Consumer-->>RMQ: ACK
    PG-->>Debezium: WAL flush (command)
    Debezium->>RMQ: Publish to main.topic.exchange\n routing_key=notification.command.telegram
    RMQ->>Notification: Deliver message (queue notifications.telegram)
    Notification->>PgBoss: boss.send('send-telegram', ...)
    PgBoss->>Notification: Job scheduled
    Notification-->>RMQ: ACK
    PgBoss->>Redis: acquireNotificationToken (Lua sliding window)
    Redis-->>PgBoss: Token granted (wait 0)
    PgBoss->>Telegram: POST sendMessage
    Telegram-->>PgBoss: 200 OK
    PgBoss->>PgBoss: Mark job done
    Note over Consumer,Notification: Если сообщение не обработано после 5 попыток (x-delivery-limit), оно идёт в DLX.
    RMQ->>DLQ: Dead message
    DLQ->>PG: INSERT dlq_messages
Loading

✨ Основные возможности

  • ✅ Transactional Outbox + CDC – единая таблица outbox_table для событий и команд; Debezium читает WAL и гарантирует доставку без поллинга.
  • ✅ Динамическая маршрутизация событий – routing key берётся из колонки aggregate_type, что позволяет добавлять новые типы событий без изменения конфигурации Debezium.
  • ✅ Многоуровневая идемпотентность:
    • Команды – защита через idempotency_commands в producer‑сервисе (пессимистичная блокировка + возврат сохранённого ответа) и через singletonKey в pg‑boss для уведомлений.
    • События – защита через idempotency_events (вставка с ON CONFLICT DO NOTHING внутри транзакции).
  • ✅ Версионирование контрактов – каждое DTO содержит поле version; обработчики валидируют его и отклоняют несовместимые версии.
  • ✅ Quorum Queues + Dead Letter Exchange – очереди устойчивы к потере данных; после 5 неудачных попыток сообщение перемещается в DLX и сохраняется в БД.
  • ✅ Rate Limiting (Sliding Window) – Redis с Lua‑скриптом, гарантирующим не более 25 запросов/сек к Telegram API.
  • ✅ Pg‑boss для фоновых задач – singleton‑задачи по messageId, 5 ретраев с экспоненциальным бэк‑оффом; после исчерпания остаются со статусом failed.
  • ✅ PgBouncer – пул соединений для эффективной работы с PostgreSQL.
  • ✅ Модульная структура – общие библиотеки infra-rmq, idempotency, outbox, infra-pg.
  • ✅ Полная Docker-упаковка – все окружение (включая Debezium и PgBouncer) поднимается одной командой.
  • ✅ Health‑check – простой эндпоинт /orders/health в producer‑сервисе.

🧰 Стек технологий

Категория Инструмент / Технология
Фреймворк NestJS 11
Монорепозиторий Nx
Брокер сообщений RabbitMQ 4.3 (quorum queues)
База данных PostgreSQL 18.4
CDC (Change Data Capture) Debezium Server 3.0
Фоновые задачи pg-boss
Кэш / Rate Limiting Redis 7.4
Пул соединений PgBouncer 1.24
Уведомления Telegram Bot API
Контейнеризация Docker + Docker Compose
Тестирование Jest
Управление пакетами pnpm

📁 Структура проекта

nestjs-microservices/
├── apps/
│   ├── producer-service/        # HTTP API, создание заказов, outbox, очистка
│   ├── consumer-service/        # Обработка событий заказов, публикация команд
│   ├── notification-service/    # Приём команд уведомлений, pg‑boss, Telegram
│   └── dlq-logger-service/      # Сохранение мёртвых сообщений
├── libs/
│   ├── infra-rmq/               # Топология RabbitMQ, общие константы
│   ├── infra-pg/                # Пул подключений PostgreSQL
│   ├── idempotency/             # Сервис идемпотентности событий (БД)
│   └── outbox/                  # Репозиторий outbox (сохранение, удаление)
├── debezium-conf/               # Конфигурация Debezium Server
├── compose.yaml                 # Оркестрация всех сервисов и инфраструктуры
├── init-db.sql                  # Инициализация таблиц и баз данных
└── README.md

🚀 Быстрый старт

  1. Клонируйте репозиторий.

  2. Создайте файл .env на основе .env.example с переменной TELEGRAMBOT_TOKEN=<ваштокен>.

  3. Запустите систему:

docker compose up --build
  1. Отправьте заказ:
curl -X POST http://localhost:3000/orders \
  -H "Content-Type: application/json" \
  -d '{"userId": "user_123", "items": [{"product": "laptop", "quantity": 1}]}'

или прямо в консоли VScode (windows):

$body = @{
    userId = "user_123"
    items = @(
        @{ product = "laptop"; quantity = 1 }
    )
} | ConvertTo-Json -Depth 5

Invoke-RestMethod -Uri "http://localhost:3000/orders" -Method Post -ContentType "application/json" -Body $body

Ответ:

orderId                              status
-------                              ------
72859f04-e090-49ce-829c-c63353a8bf47 created

Через несколько секунд в Telegram (бот должен быть активирован пользователем) придёт уведомление.

Healthchek: GET http://localhost:3000/orders/health

  1. Проверяем статус Debezium (CDC)
docker compose logs debezium-server --tail 50

Ответ:

debezium_cdc  | 13:14:25 INFO  [common.BaseSourceTask] (pool-10-thread-1) 1 records sent during previous 00:00:23.24, last recorded offset ... messageType=INSERT
  1. Проверить RabbitMQ

http://localhost:15672/

Авторизуйтесь: Введите доступы из compose-файла: Username: guest, Password: guest

🧪 Тестирование

Проект полностью покрыт модульными тестами (Unit-тестирование обработчиков, сервисов и идемпотентных цепочек). Для изоляции внешних систем (PostgreSQL, RabbitMQ, pg-boss) применено мокирование зависимостей на уровне NestJS TestingModule.

Поскольку проект построен на базе Nx, запуск тестов для всех приложений и библиотек выполняется из корня монорепозитория одной командой, без необходимости вручную прописывать локальные скрипты в package.json каждого сервиса.

# Запустить абсолютно все тесты в монорепозитории
pnpm nx run-many -t test

# Запустить тесты только для конкретного микросервиса
pnpm nx test producer-service
pnpm nx test consumer-service
pnpm nx test notification-service

# Запустить тесты с автоматической генерацией отчета о покрытии (Coverage)
pnpm nx run-many -t test --coverage

После запуска с флагом --coverage подробный HTML-отчет о покрытии кода будет доступен в корневой директории coverage/.

🔒 Отказоустойчивость и гарантии

  • Идемпотентность команд: дублирующаяся команда с тем же commandId возвращает сохранённый ответ, повторно не обрабатывая.
  • Идемпотентность событий: проверка через INSERT ... ON CONFLICT DO NOTHING внутри транзакции, дубликаты игнорируются.
  • Dead Letter Queue: сообщения, не обработанные после 5 попыток, сохраняются в БД, а не теряются.
  • Версионирование: хэндлеры принимают только поддерживаемые версии DTO, иначе сообщение отправляется в DLQ.
  • Устойчивость очереди: quorum‑очереди RabbitMQ реплицируют данные, x‑delivery‑limit предотвращает бесконечные ретраи.

🧩 Осознанные упрощения и направления для production

  • Разделение контекстов баз данных (Shared Database): На текущем этапе producer-service (запись заказов) и consumer-service (обработка заказов) используют общую базу данных orders_db, так как они находятся в границах единого бизнес-домена. При этом сервисы уведомлений (notifications_db) и логирования ошибок (dlq_db) уже полностью изолированы в свои выделенные базы данных. В production-окружении для предотвращения конфликтов за ресурсы (Resource Contention) и обеспечения независимого масштабирования базы данных продюсера и консумера также должны быть физически разделены (например, с переходом на паттерн CQRS).
  • Атомарность outbox → pg‑boss: команда уведомления сначала попадает в outbox (Debezium доставит), а затем шедулится задача в pg‑boss. При сбое boss.send сообщение вернётся в очередь, и обработка повторится; singletonKey гарантирует идемпотентность. В продакшене можно рассмотреть единый персистентный планировщик (Kafka Connect) для полной атомарности.
  • Очистка outbox без проверки LSN: записи удаляются по времени, без сверки с confirmed_flush_lsn (для этого нужны права pg_monitor, не выданные в демо‑окружении). В production необходимо гарантировать, что Debezium обработал запись перед удалением.
  • PgBouncer + транзакционный режим сессий: драйвер Node-Postgres (pg) по умолчанию пытается динамически управлять сессионными параметрами (например, выставлять statement_timeout при каждом запросе). Для совместимости с PgBouncer в режиме transaction управление таймаутами полностью вынесено на уровень СУБД (через ALTER USER ... SET statement_timeout).
  • Мониторинг failed‑задач pg‑boss: после исчерпания ретраев задачи остаются в таблицах pg‑boss со статусом failed. Нет алертинга. В боевом окружении необходимо подключить оповещения.
  • Безопасность: используются дефолтные учётные данные, HTTP API без аутентификации. В реальном проекте — Vault, JWT, mTLS.
  • Наблюдаемость: отсутствует распределённая трассировка и централизованные логи. Добавлен только health‑check. Рекомендуется OpenTelemetry, сбор correlationId.
  • Управление очередями: DLQ создаются вручную, а основные — через декораторы. При росте числа очередей стоит автоматизировать топологию через CI/CD.

📈 Дальнейшее развитие

  • Saga для распределённых транзакций (Inventory, Payment).
  • CQRS с разделением моделей чтения/записи и окончательным разделением orders_db на изолированные базы данных для каждого микросервиса.
  • Contract Testing (Pact).
  • OpenTelemetry для трассировки.
  • Schema Registry для централизованного управления контрактами.
  • Replay‑утилита для возврата сообщений из DLQ.

Releases

No releases published

Packages

 
 
 

Contributors