rabbitmq延迟插件实现延迟队列

之前我们讲过用死信队列来实现延迟队列

rabbitmq死信队列实现延迟队列

今天我们介绍一种新方法,使用延迟插件来实现延迟队列。

首先我先要安装一下这个插件

插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags

找到和自己rabbitmq版本相同的版本进行下载,我的rabbitmq版本是3.9的,所以我下载3.9.0的

将下载好的rabbitmq_delayed_message_exchange-3.9.0.ez这个文件上传到/usr/lib/rabbitmq/lib/rabbitmq_server-3.9.1/plugins(这个目录是rabbitmq的插件目录,找到你自己的rabbitmq的插件目录)这个目录下

然后运行下面这行命令,启用这个插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看插件列表

rabbitmq-plugins list

如果插件前面的中括号里有E*,说明插件已经启用成功了。

插件启用成功后,交换器类型会增加"x-delayed-message"这种类型

接下来就可以开始用了

生产者:

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$vhost = 'order';

$exc_name = 'delay_exc_pay';
$routing_key = 'delay_route_pay';
$queue_name = 'delay_queue_pay';
$ttl = 20000;//消息过期时间,单位是毫秒

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();

$channel->exchange_declare($exc_name,'x-delayed-message',false,true,false);
$args = new AMQPTable(['x-delayed-type'=>'direct']);
$channel->queue_declare($queue_name,false,true,false,false,false,$args);
$channel->queue_bind($queue_name,$exc_name,$routing_key);

$data = 'this is delayed message';
$arr = ['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT,'application_headers'=>new AMQPTable(['x-delay'=>$ttl])];
$msg = new AMQPMessage($data,$arr);
$channel->basic_publish($msg,$exc_name,$routing_key);
$channel->close();
$connection->close();

运行生产者的代码,会报个错,需要我们手动创建一下交换器,进入到web管理界面,按照下图方式操作

消费者:

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$vhost = 'order';

$exc_name = 'delay_exc_pay';
$routing_key = 'delay_route_pay';
$queue_name = 'delay_queue_pay';

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'x-delayed-message',false,true,false);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function($msg){
    echo $msg->body."\n";
    $msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
    $channel->wait();
}
$channel->close();
$connection->close();

这样就通过延迟插件实现了延迟队列的功能。

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: