2020-11-24
RabbitMQ简单入门
RabbitMQ
- 是实现AMQP
协议的一种消息队列中间件(英文是:message broker
),目前属于 VMware
公司下。其内部结构最简单描述如下:
Publisher(生产者) ---> Channel(信道) ----> Consumer(消费者)
// RabbitMQ 内部包含:
bindings(exchange,routing—key)
Exchange(交换器) ---------------------------------> Queue(队列)
生产者通过信道
发布消息,发布消息时不是指定队列
,而是指定交换器
或者指定路由键(routing-key
),由绑定关系
(bindings
:交换器
经过路由键
到交换器
或者队列
的规则)来确定消息存在哪个队列
上。其中引入了 3 个概念,一个是交换器
,一个是路由键
,还有一个绑定关系
。但这些都不是必须手动设置的,实际使用中,只要创建一个队列,就有一个和队列同名的默认的路由键
,和默认的绑定关系
。也就是说,发布消息时,只需要指定路由键
为队列名称
,RabbitMQ
就会把消息发送到这个队列上
$connection = new AMQPStreamConnection('192.168.66.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$msg = new AMQPMessage('you are boy');
// 假设 test 消息队列存在,如果没有会报错,或者程序来 declare 一个队列。
$channel->basic_publish(
$msg, // msg
'', // exchange
'test' //routing_key
);
1. 基础
队列中包含的数据是指定格式的消息(Mesages)
1.1 队列属性
每个队列都包含很多属性,如:
名称
,长度255个字节(utf8),可以自定义,也可以不指定队列名称,由服务端生成一个唯一的名称,一般和唯一性(Exclusive)
配合使用。名称是一个队列必须的属性持久性(Durable)
,决定当前队列在服务重启后是否保留唯一性(Exclusive)
, 只在一个连接中使用,且使用完毕就删除的队列自动删除(Auto-delete)
,在最后一个消费者取消订阅后,会自动删除队列- 其他可选参数
x-message-ttl (time to live)
过期时间,在队列上设置所有消息的ttl
x-max-length
长度限制x-max-priority
当前队列的优先级 优先级的使用x-queue-mode
消息模式可选lazy
参考懒惰消息的使用
其中,Exclusive
属性设置了就是一个独占队列
,是一个特殊的队列,它只能在一个socket
连接中使用,且连接释放就自动删除,所以不要给独占队列设置名称,如果有其他进程或者线程声明相同名称的队列,会报错误,提示你无法打开当前队列。独占队列的使用场景,大概是为了单进程/线程,自己生产消息,经过业务逻辑计算后,再自己消费消息,无法使用异步的方式,也无法体现消息队列的好处(异步),所以不太明白独占队列的好处是什么!
1.2 消息属性
一个队列包含很多消息,队列包含一些属性,同样地消息也有属性,一般在生产者发布消息时设置,常见如:
delivery mode
,Enum(1,2)
1 是非持久消息,2 是持久消息,也有客户端使用布尔值来表示,必选项,其他都是可选type
类型,程序自定义一些,方便分类expiration
和队列的x-message-ttl
一样,这个指定单个消息的过期时间,没有则继承队列属性,有则取两者最小值。timestamp
发送消息的时间戳- 等等
1.3 消息存储位置
消息的存储位置,或者是在内存中,或者是在磁盘中。区别在于持久属性(Durability
)的设置,是持久性队列
(英文是:durable queue
),就存储在磁盘中(当然内存也有),是非持久队列
(英文是:transient queue
)一般存储在内存中,为什么是一般
呢?因为当机器的可用内存变小,达到内存阈值之下,也会把非持久消息刷到磁盘中,它依赖服务端的参数内存高水位
vm_memory_high_watermark.relative
设置的百分比,默认 0.4 表示 40% 可用内存。
1.4 消息排序
我们都知道队列是FIFO
(先进先出)的次序,RabbitMQ
一般也是按照FIFO
的方式将消息入队和出队的。但是例外的情况有,如果是多个消费者
、消息包含优先级
、重发消息
三种情况时,就不能保证绝对的FIFO
了。
多个消费者的情况下,且每个消费者的相同的优先级,队列使用轮询(round-robin
)的方式发送消息给消费者。round-robin
的方式类似于,小孩子分果子游戏,即每个消费者 只能 获取第 M % N + 1 个消息,M 为消息编号,N 为同时刻消费者的个数,每个消费者获取消息个数相对均衡。但有一个潜在的问题,就是当消息但确认机制设置为手动模式,每次投递消息后必须等待消费者发送但确认消息,才能继续投递下一个消息给当前消费者,如果某一个消费者处理的速度太慢,会造成有的消费者早就处理完 x 个消息,但是部分消费者还在运行中。所以第一种情况有多个消费者
,就无法保证消息的出队列次序和入队列的次序一样了。
1.5 消息拉取模式
不管是什么消息队列,消费者最终接收到消息,一般都有两种模式:一种是推模式
也就是push api
,另一种是拉模式
也就是pull api
。
-
推模式 一般是客户端保持长连接,在连接中消费者注册处理消息的回调函数,队列一旦有消息,就会把消息推送给消费者,所以推模式的
消息及时性
高,是推荐使用的方式 -
拉模式
当然消费者也可以选择,自己在需要的时候,主动一条条从队列中获取消息,使用
basic.get
接口,但是这种方式大部分时间,队列处于空闲时间,系统的利用率很低,而且消息及时性
不高,所以不推荐使用拉取模式。
1.6 确认机制
服务节点投递消息给消费者时,为了确认消息是否送达,一般需要确认机制(英文是:ack
,acknowledgements
),来保证消费者确实收到了消息队列投递的消息。确认机制有两种:
- 自动模式,一旦消息队列发送消息后,就认为已经送达消费者
- 手动模式,消息队列发送消息后,在消费者收到消息后,必须发送明确的确认消息给消息队列,消息队列收到后才确认消息已经送达,否则就会被阻塞等待。确认消息包含几种情况:
basic.ack
肯定确认,消息队列收到后删除消息basic.nack
否定确认,消息队列收到后重投消息(requeue
属性设置为true
), 进入私信队列(requeue
属性设置为false
)basic.reject
否定确认,消息队列收到后删除消息- 如果发送后,消费者断开,消息会被重投
一般认为自动确认模式是不可靠的,即不能保证消息送达,但自动模式节省了消息投递的时间,所以吞吐量更大,同时也带来一个问题,就是消费者过载的问题:因为自动模式不像手动模式那样有信道预期投放限制(channel perfetch settings
),容易导致在消费者的消费速度不足时,由于投放太快的消息,都缓存在内存中,消耗完栈空间。所以需要在使用自动模式时,一定要留意自己的消费者能力,是否有足够的处理速度。
basic.qos
也就是上面的信道投放限制,它是通过定义一个信道最大未确认的消息
个数,来限制投放速度来实现的。
3. 高级特性
RabbitMQ
也有一些高级特性,如:
3.1 懒惰队列
消息队列中持久化消息,会在收到消息之后持久化到磁盘中,同时缓存在内存中;非持久化消息存储在内存中,但当遇到设置的内存阈值,也会批量将消息刷到磁盘,批量刷消息到磁盘会阻塞消息的处理,无法接收新的消息,为了解决此类问题,RabbitMQ
引入懒惰队列
,他可以尽可能地将消息(不管是持久化消息,还是非持久化消息)刷到磁盘中保存,内存只保留部分消息,所以适合来不及消费,会堆积大量的消息,但又不会占用太多内存的场景。
使用方式,声明队列时,设置x-queue-mode
的属性为lazy
即可。
3.2 优先级队列
优先级队列(Priority Queue)是在队列和消息上增加优先级属性,范围 1 ~ 255 之间,默认没有优先级,如果设置了队列的最大优先级,且消息设置了优先级属性,那么消费的出队次序收到优先级的影响,数值越大越优先出队。 优先级设置有效的关键点:
-
- 队列必须设置
x-max-priority
值,因为消息的优先级priority
属性默认是 0,如果大于x-max-priority
则取x-max-priority
的值,但x-max-priority
是没有默认值或者默认是 0 ,所以不设置x-max-priority
, 消息优先级设置了也没用效果
- 队列必须设置
-
- 消费者的消费速度不能太快,如果太快,生产者一但有消息,就会被消费,那么优先级设置的,也没有效果。
示例 PHP 代码代码参考 优先级队列的使用
3.3 死信队列
死信(Dead Letter)-顾名思义,就是无法被消费者消费的消息,有三种情况会产生私信:
-
- 手动确认模式(no_ack=false) 下,如果是
nack
,确设置requeue
为true
,那么这个消息会重新被投递当当前队列,且出队的位置尽量靠队首
- 手动确认模式(no_ack=false) 下,如果是
-
- 消息过期
-
- 消息队列长度超出限制
死信队列就是专门保存死信的一些队列,具体使用如下
-
- 创建一个死信交换器
exchange.dead
, 属性要设置为type=direct
,internal=true
(表示不能由生产者直接发送,只能用于绑定)
- 创建一个死信交换器
-
- 创建一个队列
queue.dead
,用于接收死信,顾名思义这个队列就是死信队列
- 创建一个队列
-
- 创建绑定关系
From=exchange.dead
&RoutingKey=exchange.dead
&To=queue.dead
- 创建绑定关系
-
- 创建业务队列
queue.biz
,属性x-dead-letter-exchange
设置为exchange.dead
。属性x-dead-letter-routing-key
设置为exchange.dead
,否则,需要创建新的绑定关系,From
To
和 3 一样,RoutingKey
设置成queue.biz
(因为每个队列有一个和队列名称同名的route key
)
- 创建业务队列
-
- 业务队列中的消息只要变成死信,就会自动经过
x-dead-letter-exchange
设置的死信交换器和x-dead-letter-routing-key
,路由到具体的死信队列上,也就是queue.dead
其中注意点是,成为死信的第一点,requeue 为 true 会成为死信,但如果队列设置了死信队列交换器,且死信路由键不是当前队列名称(或者默认没有设置),那么需要 requeue 设置为 false,消息才会转发给期望的死信队列,否则消息还是在原来的队列,只是状态重新变为可投递的。
- 业务队列中的消息只要变成死信,就会自动经过
死信队列的常用场景是,结合TTL
与DLX
和DLK
来实现延迟消息,比如取消超时(30分钟)未付款的订单等。具体方法如下:
-
- 创建死信交换器(DLX)
cancel.order
- 创建死信交换器(DLX)
-
- 创建新的队列用于存储所有的新订单,ttl=半小时,且dlx和dlk都只设置为
cancel.order
- 创建新的队列用于存储所有的新订单,ttl=半小时,且dlx和dlk都只设置为
-
- 创建新的队列用于存储所有超过30分钟的订单
- 创建新的队列用于存储所有超过30分钟的订单
-
- 创建绑定关系,指向 3 的队列
- 创建绑定关系,指向 3 的队列
所有新的订单推送到队列new.order.for.cancel
上,这个队列不去使用,只等待30分钟过期后,里面的订单消息会变成死信,由于队列new.order.for.cancel
设置了死信交换器cancle.order
和路由键cancel.order
,所以走设置的而不走默认的交换器+路由键,经过绑定关系查询,从cancel.order
的exchange
经过cancel.order
的routing key
,消息会投递到队列dead.order.for.cancel
上,而不是当前队列new.order.for.cancel
上。
这个时候,开启一个取消订单的脚本进程,消费队列dead.order.for.cancel
里的订单,肯定是超过30分钟的,然后查询是否付款,没有则取消之。
如果你觉得麻烦,可以使用一款轻量级的延时队列Beanstalkd
,它的特色就是优先级和延迟时间;它的消息大概有几种状态,分别是:就绪中、延迟中、处理中、休眠中等,如果一个消息设置了延迟时间,那么它的状态是延迟中,内部有一个计时器,当延迟时间过了之后,才会是就绪中的状态,消费者只能获取就绪中状态的消息。内部的几种状态也有一些转换路径。
4. 使用
推荐使用docker
安装体验,打开localhost:15672
自带的管理Web端
4.1 常用管理命令
查看支持的命令
root@d278bfbae965:/# rabbitmqadmin list consumers
查看队列
root@d278bfbae965:/# rabbitmqadmin list queues
+-------+----------+
| name | messages |
+-------+----------+
| hello | 0 |
+-------+----------+
查看消费者
root@d278bfbae965:/# rabbitmqadmin list consumers
+--------------+--------+-----------------+---------------------------------+-----------+----------------+
| ack_required | active | activity_status | consumer_tag | exclusive | prefetch_count |
+--------------+--------+-----------------+---------------------------------+-----------+----------------+
| False | True | up | amq.ctag-ICS6hI7xYzbpj5-0r8k5VA | True | 0 |
+--------------+--------+-----------------+---------------------------------+-----------+----------------+
4.2 优先级队列的使用
sender.php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('192.168.66.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare(
'priority-hello',
false,
false,
false,
false,
false,
[
'x-max-priority' => ['I', 10]
]
);
for ($i = 0; $i < 10; $i++) {
$args = [
'delivery_mode' => 2,
'priority' => 1
];
if ($i === 3) {
$args['priority'] = 2;
}
$msg = new AMQPMessage('Hello world - ' . $i, $args);
$channel->basic_publish($msg, '', 'priority-hello');
}
echo " Send 'Hello world'\n";
$channel->close();
$connection->close();
receiver.php
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('192.168.66.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
echo "Waiting for messages\n";
$callback = function ($msg) {
sleep(1);
$body = $msg->body;
echo 'receive ' . $body . PHP_EOL;
};
$channel->basic_consume('priority-hello', '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
消费者输出:
Waiting for messages
receive Hello world - 3
receive Hello world - 0
receive Hello world - 1
receive Hello world - 2
receive Hello world - 4
receive Hello world - 5
receive Hello world - 6
receive Hello world - 7
receive Hello world - 8
receive Hello world - 9