Solidis LogoSolidis
← Back to Tutorials
Intermediate
35 min

Building a Job Queue System

Create a robust background job processing system using Redis lists for reliable task execution.

What You'll Build
  • Priority-based job queue
  • Multiple worker support
  • Job retry mechanism
  • Dead letter queue for failed jobs
1
Job Queue Manager
1import { SolidisFeaturedClient } from '@vcms-io/solidis/featured';
2import { v4 as uuidv4 } from 'uuid';
3
4export interface Job<T = any> {
5  id: string;
6  type: string;
7  data: T;
8  priority: number;
9  attempts: number;
10  maxAttempts: number;
11  createdAt: number;
12  processedAt?: number;
13}
14
15export class JobQueue {
16  private client: SolidisFeaturedClient;
17  private queueName: string;
18  private processingName: string;
19  private deadLetterName: string;
20
21  constructor(options: { host?: string; port?: number; queueName?: string } = {}) {
22    this.client = new SolidisFeaturedClient({
23      host: options.host || '127.0.0.1',
24      port: options.port || 6379,
25    });
26    this.queueName = options.queueName || 'jobs';
27    this.processingName = `${this.queueName}:processing`;
28    this.deadLetterName = `${this.queueName}:dead`;
29  }
30
31  async connect(): Promise<void> {
32    await this.client.connect();
33  }
34
35  /**
36   * Add a job to the queue
37   */
38  async addJob<T>(
39    type: string,
40    data: T,
41    options: { priority?: number; maxAttempts?: number } = {}
42  ): Promise<string> {
43    const job: Job<T> = {
44      id: uuidv4(),
45      type,
46      data,
47      priority: options.priority || 0,
48      attempts: 0,
49      maxAttempts: options.maxAttempts || 3,
50      createdAt: Date.now(),
51    };
52
53    // Add to sorted set with priority as score (higher = more priority)
54    await this.client.zadd(
55      this.queueName,
56      -job.priority, // Negative for descending order
57      JSON.stringify(job)
58    );
59
60    return job.id;
61  }
62
63  /**
64   * Get next job from queue
65   */
66  async getNextJob(): Promise<Job | null> {
67    // Get highest priority job
68    const jobs = await this.client.zpopmax(this.queueName);
69
70    if (jobs.length === 0) {
71      return null;
72    }
73
74    const job = JSON.parse(jobs[0].toString()) as Job;
75
76    // Move to processing queue
77    await this.client.hset(this.processingName, job.id, JSON.stringify(job));
78
79    return job;
80  }
81
82  /**
83   * Complete a job
84   */
85  async completeJob(jobId: string): Promise<void> {
86    await this.client.hdel(this.processingName, jobId);
87  }
88
89  /**
90   * Fail a job (retry or move to dead letter)
91   */
92  async failJob(jobId: string, error: string): Promise<void> {
93    const jobData = await this.client.hget(this.processingName, jobId);
94
95    if (!jobData) {
96      return;
97    }
98
99    const job = JSON.parse(jobData.toString()) as Job;
100    job.attempts++;
101
102    if (job.attempts >= job.maxAttempts) {
103      // Move to dead letter queue
104      await this.client.lpush(
105        this.deadLetterName,
106        JSON.stringify({ ...job, error })
107      );
108      await this.client.hdel(this.processingName, jobId);
109    } else {
110      // Retry: move back to main queue
111      await this.client.zadd(
112        this.queueName,
113        -job.priority,
114        JSON.stringify(job)
115      );
116      await this.client.hdel(this.processingName, jobId);
117    }
118  }
119
120  /**
121   * Get queue statistics
122   */
123  async getStats() {
124    const waiting = await this.client.zcard(this.queueName);
125    const processing = await this.client.hlen(this.processingName);
126    const dead = await this.client.llen(this.deadLetterName);
127
128    return { waiting, processing, dead };
129  }
130
131  /**
132   * Clear dead letter queue
133   */
134  async clearDeadLetters(): Promise<number> {
135    const length = await this.client.llen(this.deadLetterName);
136    if (length > 0) {
137      await this.client.del(this.deadLetterName);
138    }
139    return length;
140  }
141}
2
Job Worker
1import { JobQueue, Job } from './job-queue';
2
3type JobHandler<T = any> = (data: T) => Promise<void>;
4
5export class JobWorker {
6  private queue: JobQueue;
7  private handlers: Map<string, JobHandler>;
8  private running: boolean = false;
9  private pollInterval: number;
10
11  constructor(queue: JobQueue, pollInterval: number = 1000) {
12    this.queue = queue;
13    this.handlers = new Map();
14    this.pollInterval = pollInterval;
15  }
16
17  /**
18   * Register a job handler
19   */
20  register<T>(type: string, handler: JobHandler<T>): void {
21    this.handlers.set(type, handler);
22  }
23
24  /**
25   * Start processing jobs
26   */
27  async start(): Promise<void> {
28    this.running = true;
29    console.log('Worker started');
30
31    while (this.running) {
32      try {
33        const job = await this.queue.getNextJob();
34
35        if (job) {
36          await this.processJob(job);
37        } else {
38          // No jobs available, wait before polling again
39          await new Promise((resolve) =>
40            setTimeout(resolve, this.pollInterval)
41          );
42        }
43      } catch (error) {
44        console.error('Worker error:', error);
45        await new Promise((resolve) => setTimeout(resolve, 1000));
46      }
47    }
48  }
49
50  /**
51   * Stop processing jobs
52   */
53  stop(): void {
54    this.running = false;
55    console.log('Worker stopped');
56  }
57
58  private async processJob(job: Job): Promise<void> {
59    const handler = this.handlers.get(job.type);
60
61    if (!handler) {
62      console.error(`No handler for job type: ${job.type}`);
63      await this.queue.failJob(
64        job.id,
65        `No handler registered for type: ${job.type}`
66      );
67      return;
68    }
69
70    try {
71      console.log(`Processing job ${job.id} (type: ${job.type})`);
72      await handler(job.data);
73      await this.queue.completeJob(job.id);
74      console.log(`Completed job ${job.id}`);
75    } catch (error) {
76      const message = error instanceof Error ? error.message : 'Unknown error';
77      console.error(`Failed job ${job.id}:`, message);
78      await this.queue.failJob(job.id, message);
79    }
80  }
81}
3
Usage Example
1import { JobQueue } from './job-queue';
2import { JobWorker } from './job-worker';
3
4// Create queue
5const queue = new JobQueue({
6  host: '127.0.0.1',
7  port: 6379,
8  queueName: 'myapp:jobs',
9});
10
11await queue.connect();
12
13// Create worker
14const worker = new JobWorker(queue);
15
16// Register job handlers
17worker.register('send-email', async (data: { to: string; subject: string }) => {
18  console.log(`Sending email to ${data.to}`);
19  await sendEmail(data.to, data.subject);
20});
21
22worker.register('process-image', async (data: { url: string }) => {
23  console.log(`Processing image: ${data.url}`);
24  await processImage(data.url);
25});
26
27worker.register('generate-report', async (data: { userId: string }) => {
28  console.log(`Generating report for user ${data.userId}`);
29  await generateReport(data.userId);
30});
31
32// Start worker
33worker.start();
34
35// Add jobs from your application
36await queue.addJob('send-email', {
37  to: 'user@example.com',
38  subject: 'Welcome!',
39}, { priority: 5 });
40
41await queue.addJob('process-image', {
42  url: 'https://example.com/image.jpg',
43}, { priority: 3 });
44
45await queue.addJob('generate-report', {
46  userId: '123',
47}, { priority: 1, maxAttempts: 5 });
48
49// Get queue stats
50const stats = await queue.getStats();
51console.log('Queue stats:', stats);
52
53// Graceful shutdown
54process.on('SIGTERM', () => {
55  worker.stop();
56});
Advanced Features
  • Delayed Jobs
    Schedule jobs to run at a specific time
  • Job Progress Tracking
    Update and monitor job progress in real-time
  • Multiple Workers
    Scale horizontally by running multiple worker processes
  • Job Deduplication
    Prevent duplicate jobs from being queued