← Back to Tutorials
Intermediate
35 min
Building a Job Queue System
Create a robust background job processing system using Redis lists for reliable task execution.
What You'll Build
- Priority-based job queue
- Multiple worker support
- Job retry mechanism
- Dead letter queue for failed jobs
1
Job Queue Manager1import { SolidisFeaturedClient } from '@vcms-io/solidis/featured';
2import { v4 as uuidv4 } from 'uuid';
3
4export interface Job<T = any> {
5 id: string;
6 type: string;
7 data: T;
8 priority: number;
9 attempts: number;
10 maxAttempts: number;
11 createdAt: number;
12 processedAt?: number;
13}
14
15export class JobQueue {
16 private client: SolidisFeaturedClient;
17 private queueName: string;
18 private processingName: string;
19 private deadLetterName: string;
20
21 constructor(options: { host?: string; port?: number; queueName?: string } = {}) {
22 this.client = new SolidisFeaturedClient({
23 host: options.host || '127.0.0.1',
24 port: options.port || 6379,
25 });
26 this.queueName = options.queueName || 'jobs';
27 this.processingName = `${this.queueName}:processing`;
28 this.deadLetterName = `${this.queueName}:dead`;
29 }
30
31 async connect(): Promise<void> {
32 await this.client.connect();
33 }
34
35 /**
36 * Add a job to the queue
37 */
38 async addJob<T>(
39 type: string,
40 data: T,
41 options: { priority?: number; maxAttempts?: number } = {}
42 ): Promise<string> {
43 const job: Job<T> = {
44 id: uuidv4(),
45 type,
46 data,
47 priority: options.priority || 0,
48 attempts: 0,
49 maxAttempts: options.maxAttempts || 3,
50 createdAt: Date.now(),
51 };
52
53 // Add to sorted set with priority as score (higher = more priority)
54 await this.client.zadd(
55 this.queueName,
56 -job.priority, // Negative for descending order
57 JSON.stringify(job)
58 );
59
60 return job.id;
61 }
62
63 /**
64 * Get next job from queue
65 */
66 async getNextJob(): Promise<Job | null> {
67 // Get highest priority job
68 const jobs = await this.client.zpopmax(this.queueName);
69
70 if (jobs.length === 0) {
71 return null;
72 }
73
74 const job = JSON.parse(jobs[0].toString()) as Job;
75
76 // Move to processing queue
77 await this.client.hset(this.processingName, job.id, JSON.stringify(job));
78
79 return job;
80 }
81
82 /**
83 * Complete a job
84 */
85 async completeJob(jobId: string): Promise<void> {
86 await this.client.hdel(this.processingName, jobId);
87 }
88
89 /**
90 * Fail a job (retry or move to dead letter)
91 */
92 async failJob(jobId: string, error: string): Promise<void> {
93 const jobData = await this.client.hget(this.processingName, jobId);
94
95 if (!jobData) {
96 return;
97 }
98
99 const job = JSON.parse(jobData.toString()) as Job;
100 job.attempts++;
101
102 if (job.attempts >= job.maxAttempts) {
103 // Move to dead letter queue
104 await this.client.lpush(
105 this.deadLetterName,
106 JSON.stringify({ ...job, error })
107 );
108 await this.client.hdel(this.processingName, jobId);
109 } else {
110 // Retry: move back to main queue
111 await this.client.zadd(
112 this.queueName,
113 -job.priority,
114 JSON.stringify(job)
115 );
116 await this.client.hdel(this.processingName, jobId);
117 }
118 }
119
120 /**
121 * Get queue statistics
122 */
123 async getStats() {
124 const waiting = await this.client.zcard(this.queueName);
125 const processing = await this.client.hlen(this.processingName);
126 const dead = await this.client.llen(this.deadLetterName);
127
128 return { waiting, processing, dead };
129 }
130
131 /**
132 * Clear dead letter queue
133 */
134 async clearDeadLetters(): Promise<number> {
135 const length = await this.client.llen(this.deadLetterName);
136 if (length > 0) {
137 await this.client.del(this.deadLetterName);
138 }
139 return length;
140 }
141}2
Job Worker1import { JobQueue, Job } from './job-queue';
2
3type JobHandler<T = any> = (data: T) => Promise<void>;
4
5export class JobWorker {
6 private queue: JobQueue;
7 private handlers: Map<string, JobHandler>;
8 private running: boolean = false;
9 private pollInterval: number;
10
11 constructor(queue: JobQueue, pollInterval: number = 1000) {
12 this.queue = queue;
13 this.handlers = new Map();
14 this.pollInterval = pollInterval;
15 }
16
17 /**
18 * Register a job handler
19 */
20 register<T>(type: string, handler: JobHandler<T>): void {
21 this.handlers.set(type, handler);
22 }
23
24 /**
25 * Start processing jobs
26 */
27 async start(): Promise<void> {
28 this.running = true;
29 console.log('Worker started');
30
31 while (this.running) {
32 try {
33 const job = await this.queue.getNextJob();
34
35 if (job) {
36 await this.processJob(job);
37 } else {
38 // No jobs available, wait before polling again
39 await new Promise((resolve) =>
40 setTimeout(resolve, this.pollInterval)
41 );
42 }
43 } catch (error) {
44 console.error('Worker error:', error);
45 await new Promise((resolve) => setTimeout(resolve, 1000));
46 }
47 }
48 }
49
50 /**
51 * Stop processing jobs
52 */
53 stop(): void {
54 this.running = false;
55 console.log('Worker stopped');
56 }
57
58 private async processJob(job: Job): Promise<void> {
59 const handler = this.handlers.get(job.type);
60
61 if (!handler) {
62 console.error(`No handler for job type: ${job.type}`);
63 await this.queue.failJob(
64 job.id,
65 `No handler registered for type: ${job.type}`
66 );
67 return;
68 }
69
70 try {
71 console.log(`Processing job ${job.id} (type: ${job.type})`);
72 await handler(job.data);
73 await this.queue.completeJob(job.id);
74 console.log(`Completed job ${job.id}`);
75 } catch (error) {
76 const message = error instanceof Error ? error.message : 'Unknown error';
77 console.error(`Failed job ${job.id}:`, message);
78 await this.queue.failJob(job.id, message);
79 }
80 }
81}3
Usage Example1import { JobQueue } from './job-queue';
2import { JobWorker } from './job-worker';
3
4// Create queue
5const queue = new JobQueue({
6 host: '127.0.0.1',
7 port: 6379,
8 queueName: 'myapp:jobs',
9});
10
11await queue.connect();
12
13// Create worker
14const worker = new JobWorker(queue);
15
16// Register job handlers
17worker.register('send-email', async (data: { to: string; subject: string }) => {
18 console.log(`Sending email to ${data.to}`);
19 await sendEmail(data.to, data.subject);
20});
21
22worker.register('process-image', async (data: { url: string }) => {
23 console.log(`Processing image: ${data.url}`);
24 await processImage(data.url);
25});
26
27worker.register('generate-report', async (data: { userId: string }) => {
28 console.log(`Generating report for user ${data.userId}`);
29 await generateReport(data.userId);
30});
31
32// Start worker
33worker.start();
34
35// Add jobs from your application
36await queue.addJob('send-email', {
37 to: 'user@example.com',
38 subject: 'Welcome!',
39}, { priority: 5 });
40
41await queue.addJob('process-image', {
42 url: 'https://example.com/image.jpg',
43}, { priority: 3 });
44
45await queue.addJob('generate-report', {
46 userId: '123',
47}, { priority: 1, maxAttempts: 5 });
48
49// Get queue stats
50const stats = await queue.getStats();
51console.log('Queue stats:', stats);
52
53// Graceful shutdown
54process.on('SIGTERM', () => {
55 worker.stop();
56});Advanced Features
- ✓Delayed JobsSchedule jobs to run at a specific time
- ✓Job Progress TrackingUpdate and monitor job progress in real-time
- ✓Multiple WorkersScale horizontally by running multiple worker processes
- ✓Job DeduplicationPrevent duplicate jobs from being queued
