Varobj

2020-11-24

RabbitMQ简单入门



RabbitMQ - 是实现AMQP协议的一种消息队列中间件(英文是:message broker),目前属于 VMware 公司下。其内部结构最简单描述如下:

Publisher(生产者)  --->  Channel(信道)  ----> Consumer(消费者)


// RabbitMQ 内部包含:

                  bindings(exchange,routing—key)
Exchange(交换器) ---------------------------------> Queue(队列)

生产者通过信道发布消息,发布消息时不是指定队列,而是指定交换器或者指定路由键(routing-key),由绑定关系bindings交换器经过路由键交换器或者队列的规则)来确定消息存在哪个队列上。其中引入了 3 个概念,一个是交换器,一个是路由键,还有一个绑定关系。但这些都不是必须手动设置的,实际使用中,只要创建一个队列,就有一个和队列同名的默认的路由键,和默认的绑定关系。也就是说,发布消息时,只需要指定路由键队列名称RabbitMQ 就会把消息发送到这个队列上

$connection = new AMQPStreamConnection('192.168.66.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$msg = new AMQPMessage('you are boy');

// 假设 test 消息队列存在,如果没有会报错,或者程序来 declare 一个队列。
$channel->basic_publish(
    $msg,   // msg
    '',     // exchange
    'test'  //routing_key
);

1. 基础

队列中包含的数据是指定格式的消息(Mesages)

1.1 队列属性

每个队列都包含很多属性,如:

其中,Exclusive属性设置了就是一个独占队列,是一个特殊的队列,它只能在一个socket连接中使用,且连接释放就自动删除,所以不要给独占队列设置名称,如果有其他进程或者线程声明相同名称的队列,会报错误,提示你无法打开当前队列。独占队列的使用场景,大概是为了单进程/线程,自己生产消息,经过业务逻辑计算后,再自己消费消息,无法使用异步的方式,也无法体现消息队列的好处(异步),所以不太明白独占队列的好处是什么!

1.2 消息属性

一个队列包含很多消息,队列包含一些属性,同样地消息也有属性,一般在生产者发布消息时设置,常见如:

1.3 消息存储位置

消息的存储位置,或者是在内存中,或者是在磁盘中。区别在于持久属性(Durability)的设置,是持久性队列(英文是:durable queue),就存储在磁盘中(当然内存也有),是非持久队列(英文是:transient queue)一般存储在内存中,为什么是一般呢?因为当机器的可用内存变小,达到内存阈值之下,也会把非持久消息刷到磁盘中,它依赖服务端的参数内存高水位 vm_memory_high_watermark.relative 设置的百分比,默认 0.4 表示 40% 可用内存。

1.4 消息排序

我们都知道队列是FIFO(先进先出)的次序,RabbitMQ一般也是按照FIFO的方式将消息入队和出队的。但是例外的情况有,如果是多个消费者消息包含优先级重发消息 三种情况时,就不能保证绝对的FIFO了。

多个消费者的情况下,且每个消费者的相同的优先级,队列使用轮询(round-robin)的方式发送消息给消费者。round-robin的方式类似于,小孩子分果子游戏,即每个消费者 只能 获取第 M % N + 1 个消息,M 为消息编号,N 为同时刻消费者的个数,每个消费者获取消息个数相对均衡。但有一个潜在的问题,就是当消息但确认机制设置为手动模式,每次投递消息后必须等待消费者发送但确认消息,才能继续投递下一个消息给当前消费者,如果某一个消费者处理的速度太慢,会造成有的消费者早就处理完 x 个消息,但是部分消费者还在运行中。所以第一种情况有多个消费者,就无法保证消息的出队列次序和入队列的次序一样了。

参考文档 “消息排序”

1.5 消息拉取模式

不管是什么消息队列,消费者最终接收到消息,一般都有两种模式:一种是推模式也就是push api,另一种是拉模式也就是pull api

1.6 确认机制

服务节点投递消息给消费者时,为了确认消息是否送达,一般需要确认机制(英文是:ackacknowledgements),来保证消费者确实收到了消息队列投递的消息。确认机制有两种:

一般认为自动确认模式是不可靠的,即不能保证消息送达,但自动模式节省了消息投递的时间,所以吞吐量更大,同时也带来一个问题,就是消费者过载的问题:因为自动模式不像手动模式那样有信道预期投放限制(channel perfetch settings),容易导致在消费者的消费速度不足时,由于投放太快的消息,都缓存在内存中,消耗完栈空间。所以需要在使用自动模式时,一定要留意自己的消费者能力,是否有足够的处理速度。

basic.qos 也就是上面的信道投放限制,它是通过定义一个信道最大未确认的消息个数,来限制投放速度来实现的。

3. 高级特性

RabbitMQ 也有一些高级特性,如:

3.1 懒惰队列

消息队列中持久化消息,会在收到消息之后持久化到磁盘中,同时缓存在内存中;非持久化消息存储在内存中,但当遇到设置的内存阈值,也会批量将消息刷到磁盘,批量刷消息到磁盘会阻塞消息的处理,无法接收新的消息,为了解决此类问题,RabbitMQ引入懒惰队列,他可以尽可能地将消息(不管是持久化消息,还是非持久化消息)刷到磁盘中保存,内存只保留部分消息,所以适合来不及消费,会堆积大量的消息,但又不会占用太多内存的场景。

使用方式,声明队列时,设置x-queue-mode的属性为lazy即可。

3.2 优先级队列

优先级队列(Priority Queue)是在队列和消息上增加优先级属性,范围 1 ~ 255 之间,默认没有优先级,如果设置了队列的最大优先级,且消息设置了优先级属性,那么消费的出队次序收到优先级的影响,数值越大越优先出队。 优先级设置有效的关键点:

示例 PHP 代码代码参考 优先级队列的使用

3.3 死信队列

死信(Dead Letter)-顾名思义,就是无法被消费者消费的消息,有三种情况会产生私信:

死信队列就是专门保存死信的一些队列,具体使用如下

死信队列的常用场景是,结合TTLDLXDLK来实现延迟消息,比如取消超时(30分钟)未付款的订单等。具体方法如下:

所有新的订单推送到队列new.order.for.cancel上,这个队列不去使用,只等待30分钟过期后,里面的订单消息会变成死信,由于队列new.order.for.cancel设置了死信交换器cancle.order和路由键cancel.order,所以走设置的而不走默认的交换器+路由键,经过绑定关系查询,从cancel.orderexchange经过cancel.orderrouting key,消息会投递到队列dead.order.for.cancel上,而不是当前队列new.order.for.cancel上。

这个时候,开启一个取消订单的脚本进程,消费队列dead.order.for.cancel里的订单,肯定是超过30分钟的,然后查询是否付款,没有则取消之。

如果你觉得麻烦,可以使用一款轻量级的延时队列Beanstalkd,它的特色就是优先级和延迟时间;它的消息大概有几种状态,分别是:就绪中、延迟中、处理中、休眠中等,如果一个消息设置了延迟时间,那么它的状态是延迟中,内部有一个计时器,当延迟时间过了之后,才会是就绪中的状态,消费者只能获取就绪中状态的消息。内部的几种状态也有一些转换路径。

4. 使用

推荐使用docker安装体验,打开localhost:15672自带的管理Web端

4.1 常用管理命令

查看支持的命令

root@d278bfbae965:/# rabbitmqadmin list consumers

查看队列


root@d278bfbae965:/# rabbitmqadmin list queues
+-------+----------+
| name  | messages |
+-------+----------+
| hello | 0        |
+-------+----------+

查看消费者

root@d278bfbae965:/# rabbitmqadmin list consumers
+--------------+--------+-----------------+---------------------------------+-----------+----------------+
| ack_required | active | activity_status |          consumer_tag           | exclusive | prefetch_count |
+--------------+--------+-----------------+---------------------------------+-----------+----------------+
| False        | True   | up              | amq.ctag-ICS6hI7xYzbpj5-0r8k5VA | True      | 0              |
+--------------+--------+-----------------+---------------------------------+-----------+----------------+

4.2 优先级队列的使用

sender.php

require_once '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('192.168.66.1', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare(
    'priority-hello',
    false,
    false,
    false,
    false,
    false,
    [
        'x-max-priority' => ['I', 10]
    ]
);

for ($i = 0; $i < 10; $i++) {
    $args = [
        'delivery_mode' => 2,
        'priority' => 1
    ];
    if ($i === 3) {
        $args['priority'] = 2;
    }
    $msg = new AMQPMessage('Hello world - ' . $i, $args);
    $channel->basic_publish($msg, '', 'priority-hello');
}

echo " Send 'Hello world'\n";

$channel->close();
$connection->close();

receiver.php

require_once '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('192.168.66.1', 5672, 'guest', 'guest');
$channel = $connection->channel();

echo "Waiting for messages\n";

$callback = function ($msg) {
    sleep(1);
    $body = $msg->body;
    echo 'receive ' . $body . PHP_EOL;
};

$channel->basic_consume('priority-hello', '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

消费者输出:

Waiting for messages
receive Hello world - 3
receive Hello world - 0
receive Hello world - 1
receive Hello world - 2
receive Hello world - 4
receive Hello world - 5
receive Hello world - 6
receive Hello world - 7
receive Hello world - 8
receive Hello world - 9