Node.js
Queues & Jobs
Background job processing with Bull and BullMQ — create queues, add jobs, process them with workers, handle failures, and connect to Redis.
Bull Install: Install Bull and its peer dependency
ioredis. Bull requires a running Redis instance.terminal
JS
npm install bull ioredis
# .env
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD= # leave blank if no auth
Creating a Queue: Initialise a Bull queue connected to Redis. Queues are reusable — import the same instance in producers and consumers.
queues/emailQueue.js
JS
const Bull = require('bull');
const emailQueue = new Bull('email', {
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
password: process.env.REDIS_PASSWORD || undefined,
},
defaultJobOptions: {
attempts: 3, // retry up to 3 times on failure
backoff: {
type: 'exponential',
delay: 2000, // 2s, 4s, 8s
},
removeOnComplete: 100, // keep last 100 completed jobs
removeOnFail: 50,
},
});
module.exports = emailQueue;
Adding Jobs to the Queue: Push jobs from controllers or services. Jobs can have custom options like delay, priority, and repeat schedules.
controllers/userController.js
JS
const emailQueue = require('../queues/emailQueue');
// Add a simple job
await emailQueue.add({ to: 'user@example.com', subject: 'Welcome' });
// Add job with options
await emailQueue.add(
{ to: 'admin@example.com', type: 'report' },
{
delay: 5000, // wait 5 seconds before processing
priority: 1, // lower number = higher priority
jobId: 'unique-id', // prevent duplicate jobs
}
);
// Add a repeating job (cron)
await emailQueue.add(
{ type: 'digest' },
{ repeat: { cron: '0 8 * * *' } } // every day at 08:00
);
Processing Jobs (Worker): Define a processor function that Bull calls for each job. Use concurrency to process multiple jobs simultaneously.
workers/emailWorker.js
JS
const emailQueue = require('../queues/emailQueue');
const { sendWelcomeEmail } = require('../services/mailService');
// Process 5 jobs concurrently
emailQueue.process(5, async (job) => {
const { to, subject } = job.data;
// Update progress (0-100)
await job.progress(10);
await sendWelcomeEmail(to, subject);
await job.progress(100);
return { sent: true }; // stored as job result
});
// Lifecycle events
emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`, result);
});
emailQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed (attempt ${job.attemptsMade}):`, err.message);
});
emailQueue.on('stalled', (job) => {
console.warn(`Job ${job.id} stalled`);
});
console.log('Email worker is running...');
BullMQ (Modern Alternative): BullMQ is the TypeScript-first successor to Bull with a cleaner API. Install with
npm install bullmq.queues/bullmqExample.js
JS
const { Queue, Worker, QueueEvents } = require('bullmq');
const connection = { host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT) };
// Producer
const emailQueue = new Queue('emails', { connection });
await emailQueue.add('sendWelcome', { to: 'user@example.com' }, { attempts: 3 });
// Worker
const worker = new Worker('emails', async (job) => {
if (job.name === 'sendWelcome') {
await sendWelcomeEmail(job.data.to);
}
return { done: true };
}, { connection, concurrency: 5 });
// Events
const events = new QueueEvents('emails', { connection });
events.on('completed', ({ jobId }) => console.log('Done:', jobId));
events.on('failed', ({ jobId, failedReason }) => console.error(jobId, failedReason));
Failed Job Handling: Inspect and retry failed jobs manually, or clean them from the queue using Bull's API methods.
scripts/retryFailedJobs.js
JS
const emailQueue = require('../queues/emailQueue');
(async () => {
// Get all failed jobs
const failedJobs = await emailQueue.getFailed();
console.log(`Failed jobs: ${failedJobs.length}`);
// Retry each failed job
for (const job of failedJobs) {
console.log(`Retrying job ${job.id}:`, job.data);
await job.retry();
}
// Or clean all failed jobs older than 1 hour
await emailQueue.clean(
3600 * 1000, // grace period in ms
'failed'
);
// Pause / resume queue
await emailQueue.pause();
await emailQueue.resume();
process.exit(0);
})();