消息队列的作用
消息队列实现系统间的异步通信、解耦和削峰填谷。RabbitMQ 是最广泛使用的开源消息中间件之一,支持 AMQP 0-9-1 协议。
1. 核心概念
Producer → Exchange → Queue → Consumer
↓
Binding(路由规则)
Exchange 类型:
- Direct:精确匹配 routing key
- Topic:通配符匹配(* 匹配一个词,# 匹配零或多个词)
- Fanout:广播到所有绑定的 Queue
- Headers:根据消息头匹配
2. Node.js 生产消费实战
// producer.js
const amqp = require('amqplib');
async function publish() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
// 声明持久化 Exchange
await channel.assertExchange('orders', 'topic', { durable: true });
// 发送持久化消息
const order = { id: 123, amount: 99.99 };
channel.publish('orders', 'order.created',
Buffer.from(JSON.stringify(order)),
{ persistent: true, contentType: 'application/json' }
);
console.log('订单消息已发送');
setTimeout(() => { conn.close(); process.exit(0); }, 500);
}
// consumer.js
async function consume() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
await channel.assertExchange('orders', 'topic', { durable: true });
// 声明队列
const q = await channel.assertQueue('order-service', { durable: true });
// 绑定
await channel.bindQueue(q.queue, 'orders', 'order.*');
// 预取控制(公平分发)
channel.prefetch(1);
console.log('等待订单消息...');
channel.consume(q.queue, (msg) => {
const order = JSON.parse(msg.content.toString());
console.log('处理订单:', order.id);
// 手动确认
channel.ack(msg);
});
}
3. 消息可靠性保证
// 生产端确认(Publisher Confirms)
const confirmed = await new Promise((resolve) => {
channel.confirmSelect();
channel.once('error', () => resolve(false));
channel.publish('orders', 'order.created', buf, { persistent: true }, (err) => {
resolve(!err);
});
});
// 消费端确认
channel.consume(q.queue, async (msg) => {
try {
await processOrder(msg);
channel.ack(msg); // 成功确认
} catch (err) {
channel.nack(msg, false, true); // 拒绝并重新入队
}
});
4. 死信队列(DLQ)
await channel.assertQueue('main-queue', {
durable: true,
deadLetterExchange: 'dlx',
deadLetterRoutingKey: 'failed',
messageTtl: 1000 * 60 * 5, // 5 分钟未消费则转入死信
});
await channel.assertExchange('dlx', 'fanout', { durable: true });
await channel.assertQueue('dead-letter-queue', { durable: true });
await channel.bindQueue('dead-letter-queue', 'dlx', '');
总结
RabbitMQ 的生产环境部署需要精心设计消息确认机制、持久化策略和死信处理。掌握这些,才能构建真正可靠的异步消息系统。