RabbitMq+死信队列笔记

深渊向深渊呼唤

文章内容输出来源:拉勾教育Java高薪训练营

报了拉勾课程,我从忍气吞声到大胆支出技术领导的误判,从兢兢业业加班到朝9晚5的生活方式,从还投简历却杳无音讯到电话响到无电。

RabbitMQ安装

安装socat

yum install socat -y

下载Erlang和RabbitMQ安装包

wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm

安装Erlang和RabbitMQ

rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm

启用RabbitMQ的管理插件

rabbitmq-plugins enable rabbitmq_management

启用RabbitMQ

systemctl start rabbitmq-server

添加用户

rabbitmqctl add_user root 6342180

给用户添加权限和设置标签

rabbitmqctl set_permissions root -p / ".*" ".*" ".*"
rabbitmqctl set_user_tags root administrator

死信队列介绍

用户下单,调用订单服务,这时候订单系统与派单系统采用mq异步通讯。 在定义业务队列时可以考虑指定一个死信交换机,并绑定一个死信队列。当业务队列的消息过期时,会自动变成死信。 该消息会被发送到死信队列上。 DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter) 之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。 以下几种情况导致消息变为死信:

    消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false; 消息过期; 队列达到最大长度。 对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被 消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject)而被置入死信队列中的情况,后 续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善 和优化系统。

springboot案例

案例源码:https://gitee.com/xiaolu7_laosiji/mq_demo.git 我们下载案例项目,修改对应的application.yml,运行com.lagou.mq.MqApplication启动项目

打开com.lagou.mq.config.RabbitConfig,可以查看到对应的mq队列配置如下:

/**
     * 以下三个是死信队列
     * @return
     */
    @Bean
    public Exchange exchangeOrderTTL() {
        return new DirectExchange("ex.order_ttl");
    }

    @Bean
    public Queue queueOrderTTL() {
        return new Queue("queue.order_ttl");
    }

    @Bean
    public Binding bindingOrderTTL() {
        // ? 以下的key.dlx可以随便取?
        return BindingBuilder
                .bind(queueOrderTTL())
                .to(exchangeOrderTTL())
                .with("key.dlx").noargs();
    }

    /**
     * 以下三个是正常的队列
     * @return
     */
    @Bean
    public Exchange exchangeOrder() {
        return new DirectExchange("ex.order");
    }
    @Bean
    public Queue queueOrder() {
        Map<String, Object> props = new HashMap<>();
        props.put("x-message-ttl", 10000);
        props.put("x-dead-letter-exchange", "ex.order_ttl");
        props.put("x-dead-letter-routing-key", "key.dlx");
        return QueueBuilder.durable("queue.order")
                .withArguments(props)
                .build();
    }

    @Bean
    public Binding bindingOrder() {
        return BindingBuilder
                .bind(queueOrder())
                .to(exchangeOrder())
                .with("key.biz").noargs();
    }


注意,正常队列要设置队列过期时间,并不设定监听器。我们可以编写一个死信队列监听器,实现订单过期的逻辑。 之后,我们访问http://127.0.0.1:8080/order/buy?name=联想&count=1 进行商品购买,商品购买时,会将orderId(订单ID),加入正常的队列。

我们演示中设定队列的过期时间为10秒,在10秒中内,如果用户没有操作,订单ID会自动进入死信队列。 可以调用http://127.0.0.1:8080/order/pay?id=1291032804963991553 手动进行订单确认。 在死信队列中,会对所有订单ID进行监听,如果有ID过期了,系统会判断该订单是否已完成支付。 如果未完成支付,订单会自动取消。

栏目