← Back to Tutorials
Advanced
45 min
Building a Real-time Chat Application
Create a scalable real-time chat system using Redis Pub/Sub with Solidis and WebSockets.
What You'll Build
- Real-time message broadcasting with Redis Pub/Sub
- WebSocket server integration
- Multiple chat rooms support
- Message history and persistence
1
Chat Manager with Pub/Sub1import { SolidisFeaturedClient } from '@vcms-io/solidis/featured';
2
3export interface Message {
4 id: string;
5 roomId: string;
6 userId: string;
7 username: string;
8 content: string;
9 timestamp: number;
10}
11
12export class ChatManager {
13 private publisher: SolidisFeaturedClient;
14 private subscriber: SolidisFeaturedClient;
15 private messageHandlers: Map<string, (message: Message) => void>;
16
17 constructor(options: { host?: string; port?: number } = {}) {
18 // Separate clients for pub and sub
19 this.publisher = new SolidisFeaturedClient({
20 host: options.host || '127.0.0.1',
21 port: options.port || 6379,
22 });
23
24 this.subscriber = new SolidisFeaturedClient({
25 host: options.host || '127.0.0.1',
26 port: options.port || 6379,
27 });
28
29 this.messageHandlers = new Map();
30 }
31
32 async connect(): Promise<void> {
33 await this.publisher.connect();
34 await this.subscriber.connect();
35
36 // Set up message handler
37 this.subscriber.on('message', (channel: Buffer, message: Buffer) => {
38 const roomId = channel.toString().replace('chat:', '');
39 const data = JSON.parse(message.toString()) as Message;
40
41 const handler = this.messageHandlers.get(roomId);
42 if (handler) {
43 handler(data);
44 }
45 });
46 }
47
48 /**
49 * Join a chat room
50 */
51 async joinRoom(
52 roomId: string,
53 onMessage: (message: Message) => void
54 ): Promise<void> {
55 const channel = `chat:${roomId}`;
56 this.messageHandlers.set(roomId, onMessage);
57 await this.subscriber.subscribe(channel);
58 }
59
60 /**
61 * Leave a chat room
62 */
63 async leaveRoom(roomId: string): Promise<void> {
64 const channel = `chat:${roomId}`;
65 this.messageHandlers.delete(roomId);
66 await this.subscriber.unsubscribe(channel);
67 }
68
69 /**
70 * Send a message to a room
71 */
72 async sendMessage(message: Message): Promise<void> {
73 const channel = `chat:${message.roomId}`;
74
75 // Publish to channel
76 await this.publisher.publish(channel, JSON.stringify(message));
77
78 // Store in message history
79 await this.storeMessage(message);
80 }
81
82 /**
83 * Store message in history
84 */
85 private async storeMessage(message: Message): Promise<void> {
86 const key = `chat:history:${message.roomId}`;
87
88 // Store as sorted set with timestamp as score
89 await this.publisher.zadd(
90 key,
91 message.timestamp,
92 JSON.stringify(message)
93 );
94
95 // Keep only last 100 messages
96 await this.publisher.zremrangebyrank(key, 0, -101);
97
98 // Set expiry (e.g., 7 days)
99 await this.publisher.expire(key, 7 * 24 * 3600);
100 }
101
102 /**
103 * Get message history
104 */
105 async getHistory(
106 roomId: string,
107 limit: number = 50
108 ): Promise<Message[]> {
109 const key = `chat:history:${roomId}`;
110
111 // Get last N messages
112 const messages = await this.publisher.zrange(key, -limit, -1);
113
114 return messages.map((msg) => JSON.parse(msg.toString()) as Message);
115 }
116
117 /**
118 * Get active users in a room
119 */
120 async getActiveUsers(roomId: string): Promise<string[]> {
121 const key = `chat:users:${roomId}`;
122 const members = await this.publisher.smembers(key);
123 return members.map((m) => m.toString());
124 }
125
126 /**
127 * Mark user as active in room
128 */
129 async markUserActive(roomId: string, userId: string): Promise<void> {
130 const key = `chat:users:${roomId}`;
131 await this.publisher.sadd(key, userId);
132 await this.publisher.expire(key, 300); // 5 minutes
133 }
134
135 /**
136 * Remove user from room
137 */
138 async removeUser(roomId: string, userId: string): Promise<void> {
139 const key = `chat:users:${roomId}`;
140 await this.publisher.srem(key, userId);
141 }
142}2
WebSocket Server Integration1import { WebSocketServer, WebSocket } from 'ws';
2import { ChatManager, Message } from './chat-manager';
3import { v4 as uuidv4 } from 'uuid';
4
5interface ClientData {
6 userId: string;
7 username: string;
8 roomId: string | null;
9}
10
11export class ChatServer {
12 private wss: WebSocketServer;
13 private chat: ChatManager;
14 private clients: Map<WebSocket, ClientData>;
15
16 constructor(port: number, chat: ChatManager) {
17 this.wss = new WebSocketServer({ port });
18 this.chat = chat;
19 this.clients = new Map();
20
21 this.setupServer();
22 }
23
24 private setupServer() {
25 this.wss.on('connection', (ws: WebSocket) => {
26 console.log('New client connected');
27
28 // Initialize client data
29 this.clients.set(ws, {
30 userId: uuidv4(),
31 username: 'Anonymous',
32 roomId: null,
33 });
34
35 ws.on('message', async (data: Buffer) => {
36 try {
37 const message = JSON.parse(data.toString());
38 await this.handleMessage(ws, message);
39 } catch (error) {
40 console.error('Error handling message:', error);
41 this.sendError(ws, 'Invalid message format');
42 }
43 });
44
45 ws.on('close', async () => {
46 const client = this.clients.get(ws);
47 if (client?.roomId) {
48 await this.chat.leaveRoom(client.roomId);
49 await this.chat.removeUser(client.roomId, client.userId);
50 }
51 this.clients.delete(ws);
52 console.log('Client disconnected');
53 });
54 });
55 }
56
57 private async handleMessage(ws: WebSocket, message: any) {
58 const client = this.clients.get(ws);
59 if (!client) return;
60
61 switch (message.type) {
62 case 'join':
63 await this.handleJoin(ws, client, message.roomId, message.username);
64 break;
65
66 case 'leave':
67 await this.handleLeave(ws, client);
68 break;
69
70 case 'message':
71 await this.handleChatMessage(ws, client, message.content);
72 break;
73
74 case 'history':
75 await this.handleHistory(ws, client);
76 break;
77
78 default:
79 this.sendError(ws, 'Unknown message type');
80 }
81 }
82
83 private async handleJoin(
84 ws: WebSocket,
85 client: ClientData,
86 roomId: string,
87 username: string
88 ) {
89 // Leave current room if any
90 if (client.roomId) {
91 await this.chat.leaveRoom(client.roomId);
92 }
93
94 // Update client data
95 client.roomId = roomId;
96 client.username = username || 'Anonymous';
97
98 // Join new room
99 await this.chat.joinRoom(roomId, (message) => {
100 this.sendToClient(ws, {
101 type: 'message',
102 data: message,
103 });
104 });
105
106 // Mark user as active
107 await this.chat.markUserActive(roomId, client.userId);
108
109 // Send join confirmation
110 this.sendToClient(ws, {
111 type: 'joined',
112 roomId,
113 userId: client.userId,
114 });
115
116 // Send message history
117 const history = await this.chat.getHistory(roomId);
118 this.sendToClient(ws, {
119 type: 'history',
120 messages: history,
121 });
122 }
123
124 private async handleLeave(ws: WebSocket, client: ClientData) {
125 if (client.roomId) {
126 await this.chat.leaveRoom(client.roomId);
127 await this.chat.removeUser(client.roomId, client.userId);
128 client.roomId = null;
129
130 this.sendToClient(ws, { type: 'left' });
131 }
132 }
133
134 private async handleChatMessage(
135 ws: WebSocket,
136 client: ClientData,
137 content: string
138 ) {
139 if (!client.roomId) {
140 this.sendError(ws, 'Not in a room');
141 return;
142 }
143
144 const message: Message = {
145 id: uuidv4(),
146 roomId: client.roomId,
147 userId: client.userId,
148 username: client.username,
149 content,
150 timestamp: Date.now(),
151 };
152
153 await this.chat.sendMessage(message);
154 }
155
156 private async handleHistory(ws: WebSocket, client: ClientData) {
157 if (!client.roomId) {
158 this.sendError(ws, 'Not in a room');
159 return;
160 }
161
162 const history = await this.chat.getHistory(client.roomId);
163 this.sendToClient(ws, {
164 type: 'history',
165 messages: history,
166 });
167 }
168
169 private sendToClient(ws: WebSocket, data: any) {
170 if (ws.readyState === WebSocket.OPEN) {
171 ws.send(JSON.stringify(data));
172 }
173 }
174
175 private sendError(ws: WebSocket, message: string) {
176 this.sendToClient(ws, {
177 type: 'error',
178 message,
179 });
180 }
181}3
Start the Server1import { ChatManager } from './chat-manager';
2import { ChatServer } from './chat-server';
3
4async function start() {
5 const chat = new ChatManager({
6 host: '127.0.0.1',
7 port: 6379,
8 });
9
10 await chat.connect();
11 console.log('Connected to Redis');
12
13 const server = new ChatServer(8080, chat);
14 console.log('WebSocket server running on ws://localhost:8080');
15}
16
17start().catch(console.error);Client Example
1// Simple WebSocket client example
2const ws = new WebSocket('ws://localhost:8080');
3
4ws.onopen = () => {
5 // Join a room
6 ws.send(JSON.stringify({
7 type: 'join',
8 roomId: 'general',
9 username: 'John Doe',
10 }));
11};
12
13ws.onmessage = (event) => {
14 const data = JSON.parse(event.data);
15
16 switch (data.type) {
17 case 'joined':
18 console.log('Joined room:', data.roomId);
19 break;
20
21 case 'message':
22 console.log(`${data.data.username}: ${data.data.content}`);
23 break;
24
25 case 'history':
26 console.log('Message history:', data.messages);
27 break;
28 }
29};
30
31// Send a message
32function sendMessage(content) {
33 ws.send(JSON.stringify({
34 type: 'message',
35 content,
36 }));
37}
38
39sendMessage('Hello, everyone!');