Develop

RabbitMQ 消息队列实战:从基础概念到生产环境的可靠消息传递

✎ -- 字 🕐 -- 分钟
字号
RabbitMQ 消息队列实战

消息队列的作用

消息队列实现系统间的异步通信解耦削峰填谷。RabbitMQ 是最广泛使用的开源消息中间件之一,支持 AMQP 0-9-1 协议。

1. 核心概念

Producer → Exchange → Queue → Consumer
              ↓
          Binding(路由规则)

Exchange 类型:
- Direct:精确匹配 routing key
- Topic:通配符匹配(* 匹配一个词,# 匹配零或多个词)
- Fanout:广播到所有绑定的 Queue
- Headers:根据消息头匹配

2. Node.js 生产消费实战

// producer.js
const amqp = require('amqplib');

async function publish() {
    const conn = await amqp.connect('amqp://localhost');
    const channel = await conn.createChannel();
    
    // 声明持久化 Exchange
    await channel.assertExchange('orders', 'topic', { durable: true });
    
    // 发送持久化消息
    const order = { id: 123, amount: 99.99 };
    channel.publish('orders', 'order.created',
        Buffer.from(JSON.stringify(order)),
        { persistent: true, contentType: 'application/json' }
    );
    
    console.log('订单消息已发送');
    setTimeout(() => { conn.close(); process.exit(0); }, 500);
}
// consumer.js
async function consume() {
    const conn = await amqp.connect('amqp://localhost');
    const channel = await conn.createChannel();

    await channel.assertExchange('orders', 'topic', { durable: true });
    
    // 声明队列
    const q = await channel.assertQueue('order-service', { durable: true });
    
    // 绑定
    await channel.bindQueue(q.queue, 'orders', 'order.*');
    
    // 预取控制(公平分发)
    channel.prefetch(1);
    
    console.log('等待订单消息...');
    channel.consume(q.queue, (msg) => {
        const order = JSON.parse(msg.content.toString());
        console.log('处理订单:', order.id);
        
        // 手动确认
        channel.ack(msg);
    });
}

3. 消息可靠性保证

// 生产端确认(Publisher Confirms)
const confirmed = await new Promise((resolve) => {
    channel.confirmSelect();
    channel.once('error', () => resolve(false));
    channel.publish('orders', 'order.created', buf, { persistent: true }, (err) => {
        resolve(!err);
    });
});

// 消费端确认
channel.consume(q.queue, async (msg) => {
    try {
        await processOrder(msg);
        channel.ack(msg);  // 成功确认
    } catch (err) {
        channel.nack(msg, false, true);  // 拒绝并重新入队
    }
});

4. 死信队列(DLQ)

await channel.assertQueue('main-queue', {
    durable: true,
    deadLetterExchange: 'dlx',
    deadLetterRoutingKey: 'failed',
    messageTtl: 1000 * 60 * 5,  // 5 分钟未消费则转入死信
});

await channel.assertExchange('dlx', 'fanout', { durable: true });
await channel.assertQueue('dead-letter-queue', { durable: true });
await channel.bindQueue('dead-letter-queue', 'dlx', '');

总结

RabbitMQ 的生产环境部署需要精心设计消息确认机制、持久化策略和死信处理。掌握这些,才能构建真正可靠的异步消息系统。