Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions .claude/tasks/ongoing/02-dlq-requeue-feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Dead Letter Queue Requeue Feature

## Overview
API endpoint to requeue messages from a dead letter queue to a specified destination queue using the RabbitMQ Shovel plugin.

## Requirements
- Support requeue all messages OR a specific count (via query parameter)
- Require explicit destination queue in request body
- Use RabbitMQ Shovel plugin for atomic message transfers
- Self-deleting shovel (no polling, no manual cleanup)
- Fail gracefully if requeue already in progress
- Return estimated message count in response
- Clear error handling when shovel plugin is not enabled

## API Design

### Endpoint
```
POST /dlq/:queueName/requeue?count=N
```

**Parameters:**
- `queueName` (path): Name of the dead letter queue to consume from
- `count` (query, optional): Number of messages to requeue. If omitted, requeues all.

**Request Body:**
```json
{
"destinationQueue": "target-queue"
}
```

**Response:**
```json
{
"status": "initiated",
"requeued": 5,
"failed": 0,
"errors": []
}
```

- `status`: `"initiated"` (async, shovel created) or `"completed"` (sync, e.g. MemoryQueue)
- `requeued`: Estimated number of messages to be moved (queried before shovel creation)

**Error Responses:**
- `400`: Missing/invalid `destinationQueue` or invalid `count`
- `409`-style error: Requeue already in progress for this queue
- `500`: Shovel plugin not available, auth failure, or connection error

## Implementation

### 1. Types in Queue class
**File:** `src/queue/index.ts`

```typescript
export type RequeueOptions = {
count?: number; // Number of messages to requeue (undefined = all)
destinationQueue: string; // Target queue to move messages to
}

export type RequeueResult = {
status: 'initiated' | 'completed';
requeued: number; // Estimated (initiated) or actual (completed) count
failed: number;
errors: Array<{ error: string }>;
}
```

### 2. AmqpQueue implementation via Shovel plugin
**File:** `src/queue/drivers/amqp.ts`

Uses RabbitMQ Management API:

1. **Check for existing shovel** via `GET /api/shovels/{vhost}/{name}`
- Uses deterministic name: `arnavon-requeue-{sourceQueue}`
- Fails if shovel already exists (requeue in progress)

2. **Get queue message count** via `GET /api/queues/{vhost}/{queue}`
- Used to return estimated requeue count

3. **Create self-deleting shovel** via `PUT /api/parameters/shovel/{vhost}/{name}`
- `src-delete-after`: `count` or `'queue-length'` (auto-deletes when done)
- `ack-mode`: `'on-confirm'` for safety

4. **Return immediately** with estimated count
- No polling, no manual deletion
- Shovel auto-deletes after moving messages

Key shovel configuration:
```json
{
"value": {
"src-uri": "amqp://...",
"src-queue": "dead-letters",
"dest-uri": "amqp://...",
"dest-queue": "send-email",
"src-delete-after": "queue-length",
"ack-mode": "on-confirm"
}
}
```

### 3. MemoryQueue implementation
**File:** `src/queue/drivers/memory.ts`

Synchronous implementation moving items between named queues for testing.
Returns `status: 'completed'` with actual count.

### 4. REST API endpoint
**File:** `src/server/rest/index.ts`

```typescript
api.post('/dlq/:queueName/requeue', async (req, res, next) => {
const { queueName } = req.params;
const count = req.query.count ? parseInt(req.query.count as string, 10) : undefined;
const { destinationQueue } = req.body || {};
// Validation and call to Arnavon.queue.requeue()
});
```

## Files Modified

1. `src/queue/index.ts` - Add `requeue` method and types
2. `src/queue/drivers/amqp.ts` - Implement via Shovel API
3. `src/queue/drivers/memory.ts` - Simple queue-to-queue move
4. `src/server/rest/index.ts` - Add `/dlq/:queueName/requeue` endpoint

## Configuration

### RabbitMQ Plugins
**File:** `example/rabbitmq/enabled_plugins`
```erlang
[rabbitmq_management,rabbitmq_shovel,rabbitmq_shovel_management].
```

### Docker Compose
**File:** `example/docker-compose.yml`
- RabbitMQ 4.x with management image
- Mount enabled_plugins file
- Healthcheck for service readiness

## Testing

### Unit Tests
- `tests/queue/index.spec.ts` - Base Queue class requeue method
- `tests/queue/drivers/memory.spec.ts` - MemoryQueue requeue
- `tests/server/rest/api.spec.ts` - REST API endpoint tests

### Integration Tests
- `example/webspicy/arnavon/dlq/_queueName/requeue/post.yml` - Webspicy tests
- `example/webspicy/schema.fio` - Finitio types for request/response

### Manual Testing
1. Start RabbitMQ with shovel plugins enabled
2. Push jobs to a queue with DLQ configured
3. Force jobs to fail (so they go to DLQ)
4. Call `POST /dlq/dead-letters/requeue -d '{"destinationQueue": "send-email"}'`
5. Verify response contains estimated count
6. Verify messages are moved to destination queue
7. Verify shovel auto-deleted after completion
12 changes: 6 additions & 6 deletions example/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ consumers:
type: binary
config:
path: ./consumers/mailer-worker.rb
- name: log-failures
queue: dead-letters
runner:
type: nodejs
config:
module: ./consumers/logger
# - name: log-failures
# queue: dead-letters
# runner:
# type: nodejs
# config:
# module: ./consumers/logger
# - name: log-invalid-jobs
# queue: invalid-jobs
# runner:
Expand Down
10 changes: 8 additions & 2 deletions example/consumers/mailer-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ const transporter = nodemailer.createTransport({
host: 'fakesmtp',
port: 25,
secure: false,
ignoreTLS: true
ignoreTLS: true,
});

module.exports = (job, { dispatcher, logger }) => {
const email = Object.assign({}, job.payload, {
to: [].concat(job.payload.to).filter(Boolean).join(', ')
to: [].concat(job.payload.to).filter(Boolean).join(', '),
});

if (email.to.includes('fail@enspirit.be')) {
throw new Error('Email sending failed');
}

logger.info({ email }, 'emailing');

return transporter.sendMail(email)
.then((result) => {
// log success results (example could be to save results on cold storage such as s3)
Expand Down
21 changes: 15 additions & 6 deletions example/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
version: '3.3'

services:

api:
depends_on:
- rabbitmq
rabbitmq:
condition: service_healthy
image: quadrabee/arnavon
ports:
- 3000:80
Expand All @@ -17,8 +16,10 @@ services:

workers:
depends_on:
- rabbitmq
- fakesmtp
rabbitmq:
condition: service_healthy
fakesmtp:
condition: service_started
image: example/consumers
build:
context: consumers
Expand All @@ -40,13 +41,21 @@ services:
- 1080:1080

rabbitmq:
image: rabbitmq:3.8.11-management
image: rabbitmq:4-management
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_USER: rabbit
RABBITMQ_DEFAULT_PASS: rabbit
volumes:
# Enable shovel plugins for DLQ requeue functionality
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
interval: 10s
timeout: 5s
retries: 5

webspicy:
build: ./webspicy
Expand Down
1 change: 1 addition & 0 deletions example/rabbitmq/enabled_plugins
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[rabbitmq_management,rabbitmq_shovel,rabbitmq_shovel_management].
83 changes: 83 additions & 0 deletions example/webspicy/arnavon/dlq/_queueName/requeue/post.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
---
name: |-
Requeue messages from DLQ

url: |-
/dlq/{queueName}/requeue

services:
- method: |-
POST

description: |-
Move messages from a dead letter queue to a destination queue
using the RabbitMQ Management API.

preconditions:
- The RabbitMQ Management plugin must be enabled
- The source queue must exist
- The destination queue must be specified

postconditions:
- Messages are moved from source to destination queue
- The response contains the count of requeued messages

input_schema: |-
Dlq.RequeueRequest

output_schema: |-
Dlq.RequeueResult

error_schema: |-
Dlq.Error

examples:

- description: |-
FOR: Messages are moved from source to destination queue
params:
queueName: dead-letters
destinationQueue: send-email
expected:
content_type: application/json; charset=utf-8
status: 200
assert:
- 'pathFD("", status: "initiated")'
- 'pathFD("", requeued: 0)'
- 'pathFD("", failed: 0)'
- 'pathFD("", errors: [])'

- description: |-
FOR: A specific count of messages can be moved
params:
queueName: dead-letters
destinationQueue: send-email
count: 10
expected:
content_type: application/json; charset=utf-8
status: 200

counterexamples:

- description: |-
FOR: The destination queue must be specified
dress_params: false
params:
queueName: dead-letters
expected:
content_type: application/json; charset=utf-8
status: 400
assert:
- match /destinationQueue is required/

- description: |-
FOR: The destination queue must be a string
dress_params: false
params:
queueName: dead-letters
destinationQueue: 123
expected:
content_type: application/json; charset=utf-8
status: 400
assert:
- match /destinationQueue is required/
41 changes: 41 additions & 0 deletions example/webspicy/arnavon/queues/get.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
name: |-
Get queues status

url: |-
/queues

services:
- method: |-
GET

description: |-
Retrieve status information for all queues defined in the Arnavon
configuration, including message count and consumer count.

preconditions:
- RabbitMQ Management plugin must be enabled
- Arnavon must be configured with queue topology

postconditions:
- Returns status for all configured queues
- Each queue includes name, messages, consumers, and state

input_schema: |-
.

output_schema: |-
Queue.ListResponse

error_schema: |-
.

examples:

- description: |-
FOR: Returns status for all configured queues
expected:
content_type: application/json; charset=utf-8
status: 200
assert:
- 'size("queues", 4)'
2 changes: 2 additions & 0 deletions example/webspicy/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
c.postcondition JobEnqueuedInMetrics
c.postcondition EmailsSent
c.postcondition EmailsSentInMetrics
c.postcondition NoEmailSent
c.postcondition JobSentToDeadQueue

c.errcondition NoEmailSent
c.errcondition JobErrorsInMetrics
Expand Down
Loading
Loading