之前我们讲过用死信队列来实现延迟队列
今天我们介绍一种新方法,使用延迟插件来实现延迟队列。
首先我先要安装一下这个插件
插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags
找到和自己rabbitmq版本相同的版本进行下载,我的rabbitmq版本是3.9的,所以我下载3.9.0的
将下载好的rabbitmq_delayed_message_exchange-3.9.0.ez这个文件上传到/usr/lib/rabbitmq/lib/rabbitmq_server-3.9.1/plugins(这个目录是rabbitmq的插件目录,找到你自己的rabbitmq的插件目录)这个目录下
然后运行下面这行命令,启用这个插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看插件列表
rabbitmq-plugins list
插件启用成功后,交换器类型会增加"x-delayed-message"这种类型
接下来就可以开始用了
生产者:
<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$vhost = 'order';
$exc_name = 'delay_exc_pay';
$routing_key = 'delay_route_pay';
$queue_name = 'delay_queue_pay';
$ttl = 20000;//消息过期时间,单位是毫秒
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'x-delayed-message',false,true,false);
$args = new AMQPTable(['x-delayed-type'=>'direct']);
$channel->queue_declare($queue_name,false,true,false,false,false,$args);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$data = 'this is delayed message';
$arr = ['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT,'application_headers'=>new AMQPTable(['x-delay'=>$ttl])];
$msg = new AMQPMessage($data,$arr);
$channel->basic_publish($msg,$exc_name,$routing_key);
$channel->close();
$connection->close();
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
$vhost = 'order';
$exc_name = 'delay_exc_pay';
$routing_key = 'delay_route_pay';
$queue_name = 'delay_queue_pay';
$ttl = 20000;//消息过期时间,单位是毫秒
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'x-delayed-message',false,true,false);
$args = new AMQPTable(['x-delayed-type'=>'direct']);
$channel->queue_declare($queue_name,false,true,false,false,false,$args);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$data = 'this is delayed message';
$arr = ['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT,'application_headers'=>new AMQPTable(['x-delay'=>$ttl])];
$msg = new AMQPMessage($data,$arr);
$channel->basic_publish($msg,$exc_name,$routing_key);
$channel->close();
$connection->close();
运行生产者的代码,会报个错,需要我们手动创建一下交换器,进入到web管理界面,按照下图方式操作
消费者:
<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$vhost = 'order';
$exc_name = 'delay_exc_pay';
$routing_key = 'delay_route_pay';
$queue_name = 'delay_queue_pay';
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'x-delayed-message',false,true,false);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function($msg){
echo $msg->body."\n";
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$vhost = 'order';
$exc_name = 'delay_exc_pay';
$routing_key = 'delay_route_pay';
$queue_name = 'delay_queue_pay';
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($exc_name,'x-delayed-message',false,true,false);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
$callback = function($msg){
echo $msg->body."\n";
$msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
$channel->wait();
}
$channel->close();
$connection->close();
这样就通过延迟插件实现了延迟队列的功能。