Queue Example
This example demonstrates reliable message processing with SQS queues and dead letter queues for handling failures.
Overview
The queue example shows:
- SQS queue with Lambda worker integration
- Dead letter queue for failed messages
- Automatic retry logic
- Message processing patterns
- Result storage in database
Code
typescript
import Nimbus from '@hillock-tech/nimbus-js';
const app = new Nimbus({
projectName: 'queue-demo',
stage: 'dev',
});
// Database to store processed results
const results = app.KV({ name: 'results' });
// Queue with dead letter queue for reliability
const taskQueue = app.Queue({
name: 'tasks',
deadLetterQueue: {
enabled: true,
maxRetries: 3, // Try 3 times before sending to DLQ
},
worker: async (event) => {
console.log('Processing', event.Records.length, 'messages');
for (const record of event.Records) {
try {
const task = JSON.parse(record.body);
console.log('Processing task:', task.id);
// Simulate work (with occasional failure for DLQ demo)
if (task.shouldFail) {
throw new Error('Simulated task failure');
}
// Process the task
await processTask(task);
// Save result using DynamoDB
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, PutCommand } = require('@aws-sdk/lib-dynamodb');
const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);
const tableName = process.env.KV_RESULTS;
await docClient.send(new PutCommand({
TableName: tableName,
Item: {
id: task.id,
...task,
status: 'completed',
completedAt: new Date().toISOString(),
}
}));
console.log('Task completed:', task.id);
} catch (error) {
console.error('Task failed:', error);
throw error; // Will retry up to maxRetries, then go to DLQ
}
}
},
});
// API to submit tasks
const api = app.API({ name: 'queue-api' });
// Submit task to queue
api.route('POST', '/tasks', async (event) => {
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
try {
const taskData = JSON.parse(event.body || '{}');
const task = {
id: Date.now().toString(),
...taskData,
submittedAt: new Date().toISOString(),
status: 'queued',
};
// Send to queue for processing
const sqs = new SQSClient({});
const queueUrl = process.env.QUEUE_TASKS;
await sqs.send(new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(task)
}));
return {
statusCode: 202,
body: JSON.stringify({
id: task.id,
message: 'Task queued for processing',
}),
};
} catch (error) {
console.error('Error queuing task:', error);
return {
statusCode: 500,
body: JSON.stringify({ error: 'Failed to queue task' }),
};
}
});
// Get task result
api.route('GET', '/tasks/{id}', async (event) => {
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, GetCommand } = require('@aws-sdk/lib-dynamodb');
try {
const id = event.pathParameters?.id;
const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);
const tableName = process.env.KV_RESULTS;
const result = await docClient.send(new GetCommand({
TableName: tableName,
Key: { id }
}));
if (!result.Item) {
return {
statusCode: 404,
body: JSON.stringify({ error: 'Task not found' }),
};
}
return {
statusCode: 200,
body: JSON.stringify(result.Item),
};
} catch (error) {
console.error('Error getting task:', error);
return {
statusCode: 500,
body: JSON.stringify({ error: 'Failed to get task' }),
};
}
});
// List all results
api.route('GET', '/tasks', async () => {
const { DynamoDBClient } = require('@aws-sdk/client-dynamodb');
const { DynamoDBDocumentClient, ScanCommand } = require('@aws-sdk/lib-dynamodb');
try {
const client = new DynamoDBClient({});
const docClient = DynamoDBDocumentClient.from(client);
const tableName = process.env.KV_RESULTS;
const allResults = await docClient.send(new ScanCommand({
TableName: tableName
}));
return {
statusCode: 200,
body: JSON.stringify({
tasks: allResults.Items || [],
count: allResults.Count || 0,
}),
};
} catch (error) {
console.error('Error listing tasks:', error);
return {
statusCode: 500,
body: JSON.stringify({ error: 'Failed to list tasks' }),
};
}
});
// Simulate task processing
async function processTask(task) {
// Simulate some work
await new Promise(resolve => setTimeout(resolve, 100));
console.log(`Processed task: ${task.name || 'Unnamed task'}`);
}
// Export for CLI deployment
export default app;
console.log('🌐 API URL:', result.apis[0]?.url);
console.log('📦 Queue:', result.queues?.[0]?.name);
})
.catch(console.error);Setup
Create project directory:
bashmkdir queue-example cd queue-exampleInitialize project:
bashnpm init -y npm install nimbus @aws-sdk/client-sqs @aws-sdk/client-dynamodb @aws-sdk/lib-dynamodb npm install -D @types/node tsx typescriptInitialize Nimbus state:
bashnpx nimbus initCreate the application file: Save the code above as
index.tsAdd package.json scripts:
json{ "scripts": { "deploy": "npx nimbus deploy", "destroy": "npx nimbus destroy --project queue-demo --region us-east-1 --force" } }
Deploy
bash
npm run deployThis will create:
- SQS queue with dead letter queue
- DynamoDB table for results
- Lambda worker function
- API Gateway for task submission
- All necessary IAM permissions
Test the API
Submit a Task
bash
curl -X POST https://YOUR_API_URL/tasks \
-H "Content-Type: application/json" \
-d '{"name": "Process Data", "data": "some data"}'Response:
json
{
"id": "1701234567890",
"message": "Task queued for processing"
}Submit a Failing Task (for DLQ demo)
bash
curl -X POST https://YOUR_API_URL/tasks \
-H "Content-Type: application/json" \
-d '{"name": "Failing Task", "shouldFail": true}'Check Task Status
bash
curl https://YOUR_API_URL/tasks/1701234567890Response:
json
{
"id": "1701234567890",
"name": "Process Data",
"data": "some data",
"status": "completed",
"submittedAt": "2024-12-01T12:00:00.000Z",
"completedAt": "2024-12-01T12:00:01.000Z"
}List All Tasks
bash
curl https://YOUR_API_URL/tasksResponse:
json
{
"tasks": [
{
"id": "1701234567890",
"name": "Process Data",
"status": "completed",
"completedAt": "2024-12-01T12:00:01.000Z"
}
],
"count": 1
}Key Features Demonstrated
1. Reliable Message Processing
- SQS Queue: Durable message storage
- Dead Letter Queue: Failed messages after 3 retries
- Automatic Retries: Transient failures handled automatically
- Error Isolation: One bad message doesn't stop others
2. Async Processing Flow
- Submit task → API puts message in queue
- Worker processes → Lambda function triggered automatically
- Success → Result saved to database
- Failure → Retries 3 times, then goes to DLQ
3. Environment Variables
QUEUE_TASKS- SQS queue URL (auto-injected)KV_RESULTS- DynamoDB table name (auto-injected)
Advanced Usage
Batch Processing
typescript
const batchQueue = app.Queue({
name: 'batch-tasks',
batchSize: 10, // Process up to 10 messages at once
worker: async (event) => {
const tasks = event.Records.map(record => JSON.parse(record.body));
// Process all tasks in parallel
const results = await Promise.allSettled(
tasks.map(task => processTask(task))
);
// Log results
results.forEach((result, index) => {
if (result.status === 'rejected') {
console.error(`Task ${index} failed:`, result.reason);
} else {
console.log(`Task ${index} completed successfully`);
}
});
}
});Priority Queue Pattern
typescript
// High priority queue
const highPriorityQueue = app.Queue({
name: 'high-priority-tasks',
worker: async (event) => {
// Process high priority tasks immediately
for (const record of event.Records) {
const task = JSON.parse(record.body);
await processUrgentTask(task);
}
}
});
// Normal priority queue
const normalPriorityQueue = app.Queue({
name: 'normal-priority-tasks',
worker: async (event) => {
// Process normal priority tasks
for (const record of event.Records) {
const task = JSON.parse(record.body);
await processTask(task);
}
}
});
// Route based on priority
api.route('POST', '/priority-tasks', async (event) => {
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
const task = JSON.parse(event.body || '{}');
const queueUrl = task.priority === 'high'
? process.env.QUEUE_HIGH_PRIORITY_TASKS
: process.env.QUEUE_NORMAL_PRIORITY_TASKS;
const sqs = new SQSClient({});
await sqs.send(new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(task)
}));
return {
statusCode: 202,
body: JSON.stringify({ message: 'Task queued', priority: task.priority })
};
});Message Attributes
typescript
api.route('POST', '/tasks-with-metadata', async (event) => {
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
const task = JSON.parse(event.body || '{}');
const sqs = new SQSClient({});
await sqs.send(new SendMessageCommand({
QueueUrl: process.env.QUEUE_TASKS,
MessageBody: JSON.stringify(task),
MessageAttributes: {
Priority: {
DataType: 'String',
StringValue: task.priority || 'normal'
},
Source: {
DataType: 'String',
StringValue: 'api'
},
Timestamp: {
DataType: 'Number',
StringValue: Date.now().toString()
}
}
}));
return {
statusCode: 202,
body: JSON.stringify({ message: 'Task queued with metadata' })
};
});Delayed Messages
typescript
api.route('POST', '/scheduled-tasks', async (event) => {
const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
const { task, delaySeconds } = JSON.parse(event.body || '{}');
const sqs = new SQSClient({});
await sqs.send(new SendMessageCommand({
QueueUrl: process.env.QUEUE_TASKS,
MessageBody: JSON.stringify(task),
DelaySeconds: delaySeconds || 0 // Delay up to 15 minutes
}));
return {
statusCode: 202,
body: JSON.stringify({
message: `Task scheduled for ${delaySeconds} seconds from now`
})
};
});Monitoring Dead Letter Queue
Check for failed messages in AWS Console:
- Go to SQS in AWS Console
- Find the
dev-tasks-dlqqueue - Check for messages that failed processing
Architecture
API Request
↓
SQS Queue
↓
Lambda Worker
↓
DynamoDB (Results)
↓
Dead Letter Queue (Failures)What You Get
- ✅ SQS queue with worker Lambda
- ✅ Dead letter queue for failed messages
- ✅ Automatic retry logic (3 attempts)
- ✅ Database for storing results
- ✅ API for submitting tasks
- ✅ CloudWatch monitoring
Clean Up
bash
npm run destroyThe --force flag removes all resources including data stores.
Next Steps
- Try the Timer example for scheduled processing
- Explore Authentication to secure task submission
- Learn about SQL example for database operations
Troubleshooting
Common Issues
"Queue not found"
- Make sure deployment completed successfully
- Check that environment variable
QUEUE_TASKSis set
Messages not processing
- Check CloudWatch logs for the worker Lambda function
- Verify the worker function isn't throwing unhandled errors
Tasks stuck in queue
- Check the visibility timeout setting
- Ensure the worker function completes within the timeout
DLQ not receiving messages
- Verify maxRetries is set correctly
- Check that the worker function is actually throwing errors for failed tasks