前提条件

本教程假定RabbitMQ已安装并在本地主机的标准端口(5672)上运行。如果您使用不同的主机,端口或凭据,连接设置将需要调整。

简介

RabbitMQ是一个消息broker:它接受和转发消息。你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。在这个比喻中,RabbitMQ是整个邮政系统即信箱、邮局和邮递员。 RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据的二进制数据块——消息。 RabbitMQ和一般的消息传递使用了一些术语。

生产(producing)只不过是发送而已。一个发送消息的程序是一个生产者(producer):

队列(queue)是RabbitMQ内部的邮箱名称。尽管消息流经RabbitMQ和您的应用程序,但它们只能存储在队列(queue)中。队列(queue)只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者(producer)可以发送消息到一个队列(queue),许多消费者(consumers)可以尝试从一个队列(queue)接收数据。这就是我们代表队列(queue)的方式:

消费(consuming)与接受有类似的意义。消费者(consumer)是主要等待接收消息的程序:

请注意,producer,consumer和broker不必在同一主机上;事实上在大多数应用程序中都不会。

(使用amqp.node客户端)

在本教程的这一部分,我们将用Node编写两个小程序;发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将详细介绍amqp.node API中的一些细节,将注意力集中在这个非常简单的事情上,以便开始。这是一个消息传递的“Hello World”。 在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列——RabbitMQ消费者的消息缓冲区。

amqp.node客户端库 RabbitMQ提供多种协议。本教程使用AMQP 0-9-1,这是一个开放,通用的消息传递协议。 RabbitMQ有许多不同的语言客户端。我们将在本教程中使用amqp.node客户端。 首先,使用npm安装amqp.node:

npm install amqplib

现在我们安装了amqp.node,我们可以写一些代码。

发送端

我们将调用我们的消息发布者(发送者)send.js和我们的消息使用者(接收者)receive.js。发布者将连接到RabbitMQ,发送一条消息,然后退出。 在send.js中,我们需要首先需要库:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

然后连接到RabbitMQ服务器

amqp.connect('amqp://localhost', function(err, conn) {});

接下来我们创建一个频道(channel),这是大部分API所需要做的:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {});
});

发送,我们必须申报队列给我们发送;然后我们可以发布消息到队列中:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    // Note: on Node 6 Buffer.from(msg) should be used
    ch.sendToQueue(q, new Buffer('Hello World!'));
    console.log(" [x] Sent 'Hello World!'");
  });
});

声明一个队列是幂等的——只有当它不存在时才会被创建。消息内容是一个字节数组,所以你可以使用任何编码。 最后,我们关闭连接并退出:

setTimeout(function() { conn.close(); process.exit(0) }, 500);

所以,整个代码如下:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';
    var msg = 'Hello World!';

    ch.assertQueue(q, {durable: false});
    // Note: on Node 6 Buffer.from(msg) should be used
    ch.sendToQueue(q, new Buffer(msg));
    console.log(" [x] Sent %s", msg);
  });
  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

发送端故障 如果这是您第一次使用RabbitMQ,并且您没有看到“已发送”消息,那么您可能会不知所措。也许broker启动没有足够的可用磁盘空间(默认情况下,它至少需要200 MB空闲空间),因此拒绝接受消息。检查代理日志文件以确认并在必要时减少限制。配置文件文档将告诉你如何设置disk_free_limit

接收端

这是我们的接收方。消费者获取从RabbitMQ推送的消息,因此与发布单个消息的发布者不同,我们将持续运行以监听消息并将其打印出来。

代码(在receive.js中)和send有相同的要求:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

设置与发布者相同;我们打开一个连接和一个通道,并声明我们将要使用的队列。注意这与sendToQueue发布到的队列相匹配。

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
  });
});

请注意,我们在这里也声明队列。因为我们可能会在发布者之前启动消费者,所以我们希望确保队列存在,然后再试图使用消息。 我们即将告诉服务器将队列中的消息传递给我们。由于它会异步推送消息,因此我们提供了一个回调函数,当RabbitMQ向消费者推送消息时,将执行回调函数。这是Channel.consume所做的。

console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
  console.log(" [x] Received %s", msg.content.toString());
}, {noAck: true});

完整receive.js如下:

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

执行

现在我们可以运行这两个脚本。在终端中,运行发布者:

./send.js

然后运行接收者:

./receive.js

消费者将通过RabbitMQ打印从发布者处获得的消息。消费者将继续运行,等待消息(使用Ctrl-C停止它),所以尝试从另一个终端运行发布者。

列出队列 你可能想看看RabbitMQ有什么队列,有多少条消息。您可以作为特权用户使用rabbitmqctl工具执行此操作:

sudo rabbitmqctl list_queues

在Windows上,省略sudo:

rabbitmqctl.bat list_queues