rabbitmq死信队列实现延迟队列

rabbitmq如果设置了死信队列,消息过期后就会进入到死信队列。

我们可以利用死信队列这一特性,来实现延迟队列。只要给消息设置一个过期时间,消息过期就会自动进入死信队列,消费者只要监听死信队列就可以实现延迟队列了。

应用场景:订单在一段时间内未支付则自动取消

下面以一个简单的例子来讲解,设置消息过期时间为20秒,生产者生产消息20秒之后,消息会进入到死信队列,消费者监听死信队列,就可以实现延迟队列。

生产者:

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$vhost = 'order';

$exc_name = 'exc_pay';
$routing_key = 'route_pay';
$queue_name = 'queue_pay';
$ttl = 20000;//消息过期时间,单位是毫秒

$dead_exc_name = 'dead_exc_pay';//死信交换器名称
$dead_routing_key = 'dead_route_pay';//死信routing_key
$dead_queue_name = 'dead_queue_pay';//死信队列名称

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();

$channel->exchange_declare($exc_name,'direct',false,false,false);
$args = new AMQPTable(['x-message-ttl'=>$ttl,'x-dead-letter-exchange'=>$dead_exc_name,'x-dead-letter-routing-key'=>$dead_routing_key]);
$channel->queue_declare($queue_name,false,true,false,false,false,$args);
$channel->queue_bind($queue_name,$exc_name,$routing_key);
//声明死信交换器和队列
$channel->exchange_declare($dead_exc_name,'direct',false,false,false);
$channel->queue_declare($dead_queue_name,false,true,false,false,false);
$channel->queue_bind($dead_queue_name,$dead_exc_name,$dead_routing_key);

$data = 'this is dead message';
$msg = new AMQPMessage($data,['delivery_mode'=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg,$exc_name,$routing_key);
$channel->close();
$connection->close();

消费者:

<?php
require_once __DIR__.'/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$vhost = 'order';

$dead_exc_name = 'dead_exc_pay';
$dead_routing_key = 'dead_route_pay';
$dead_queue_name = 'dead_queue_pay';

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest',$vhost);
$channel = $connection->channel();
$channel->exchange_declare($dead_exc_name,'direct',false,false,false);
$channel->queue_bind($dead_queue_name,$dead_exc_name,$dead_routing_key);
$callback = function($msg){
    echo $msg->body."\n";
    $msg->ack();
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($dead_queue_name,'',false,false,false,false,$callback);
while($channel->is_open()){
    $channel->wait();
}
$channel->close();
$connection->close();

rabbitmq除了用死信队列这种方式来实现延迟队列,还可以用延迟插件来实现,延迟插件的实现方式更好,推荐使用延迟插件

rabbitmq延迟插件实现延迟队列

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: