Node.js

Queues & Jobs

Background job processing with Bull and BullMQ — create queues, add jobs, process them with workers, handle failures, and connect to Redis.

Bull Install: Install Bull and its peer dependency ioredis. Bull requires a running Redis instance.
📄terminal
JS
npm install bull ioredis

# .env
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD=            # leave blank if no auth
Creating a Queue: Initialise a Bull queue connected to Redis. Queues are reusable — import the same instance in producers and consumers.
📄queues/emailQueue.js
JS
const Bull = require('bull');

const emailQueue = new Bull('email', {
    redis: {
        host:     process.env.REDIS_HOST,
        port:     parseInt(process.env.REDIS_PORT),
        password: process.env.REDIS_PASSWORD || undefined,
    },
    defaultJobOptions: {
        attempts:    3,                    // retry up to 3 times on failure
        backoff: {
            type:  'exponential',
            delay: 2000,                   // 2s, 4s, 8s
        },
        removeOnComplete: 100,             // keep last 100 completed jobs
        removeOnFail:     50,
    },
});

module.exports = emailQueue;
Adding Jobs to the Queue: Push jobs from controllers or services. Jobs can have custom options like delay, priority, and repeat schedules.
📄controllers/userController.js
JS
const emailQueue = require('../queues/emailQueue');

// Add a simple job
await emailQueue.add({ to: 'user@example.com', subject: 'Welcome' });

// Add job with options
await emailQueue.add(
    { to: 'admin@example.com', type: 'report' },
    {
        delay:    5000,         // wait 5 seconds before processing
        priority: 1,            // lower number = higher priority
        jobId:    'unique-id',  // prevent duplicate jobs
    }
);

// Add a repeating job (cron)
await emailQueue.add(
    { type: 'digest' },
    { repeat: { cron: '0 8 * * *' } }   // every day at 08:00
);
Processing Jobs (Worker): Define a processor function that Bull calls for each job. Use concurrency to process multiple jobs simultaneously.
📄workers/emailWorker.js
JS
const emailQueue = require('../queues/emailQueue');
const { sendWelcomeEmail } = require('../services/mailService');

// Process 5 jobs concurrently
emailQueue.process(5, async (job) => {
    const { to, subject } = job.data;

    // Update progress (0-100)
    await job.progress(10);
    await sendWelcomeEmail(to, subject);
    await job.progress(100);

    return { sent: true };   // stored as job result
});

// Lifecycle events
emailQueue.on('completed', (job, result) => {
    console.log(`Job ${job.id} completed`, result);
});

emailQueue.on('failed', (job, err) => {
    console.error(`Job ${job.id} failed (attempt ${job.attemptsMade}):`, err.message);
});

emailQueue.on('stalled', (job) => {
    console.warn(`Job ${job.id} stalled`);
});

console.log('Email worker is running...');
BullMQ (Modern Alternative): BullMQ is the TypeScript-first successor to Bull with a cleaner API. Install with npm install bullmq.
📄queues/bullmqExample.js
JS
const { Queue, Worker, QueueEvents } = require('bullmq');

const connection = { host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT) };

// Producer
const emailQueue = new Queue('emails', { connection });

await emailQueue.add('sendWelcome', { to: 'user@example.com' }, { attempts: 3 });

// Worker
const worker = new Worker('emails', async (job) => {
    if (job.name === 'sendWelcome') {
        await sendWelcomeEmail(job.data.to);
    }
    return { done: true };
}, { connection, concurrency: 5 });

// Events
const events = new QueueEvents('emails', { connection });
events.on('completed', ({ jobId }) => console.log('Done:', jobId));
events.on('failed',    ({ jobId, failedReason }) => console.error(jobId, failedReason));
Failed Job Handling: Inspect and retry failed jobs manually, or clean them from the queue using Bull's API methods.
📄scripts/retryFailedJobs.js
JS
const emailQueue = require('../queues/emailQueue');

(async () => {
    // Get all failed jobs
    const failedJobs = await emailQueue.getFailed();
    console.log(`Failed jobs: ${failedJobs.length}`);

    // Retry each failed job
    for (const job of failedJobs) {
        console.log(`Retrying job ${job.id}:`, job.data);
        await job.retry();
    }

    // Or clean all failed jobs older than 1 hour
    await emailQueue.clean(
        3600 * 1000,  // grace period in ms
        'failed'
    );

    // Pause / resume queue
    await emailQueue.pause();
    await emailQueue.resume();

    process.exit(0);
})();