一篇文章带你彻底玩转-RabbitMQ_rabbitmq中文手册

一篇文章带你彻底玩转-RabbitMQ_rabbitmq中文手册

解决方案goocz2025-02-27 12:43:3421A+A-

第一部分

1、什么是 MQ

MQ(消息队列),顾名思义,本质上是一种先入先出的队列,只不过其中存放的是消息。它也是一种跨进程的通信机制,用于上下游传递信息。在互联网架构中,MQ 是一种常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用 MQ 后,消息发送上游只需依赖 MQ,无需依赖其他服务。

2、为什么要用 MQ

1、流量消峰

举个例子,假设订单系统最多能处理一万次订单。在正常时段,这个处理能力足够应对用户的下单需求,因为用户下单后一秒就能收到结果。然而,在高峰期,如果有两万次下单,操作系统就无法处理这么多订单,只能限制订单超过一万后不允许用户下单。

为了解决这个问题,我们可以使用消息队列作为缓冲区。通过这种方式,我们可以取消对订单数量的限制,将一秒内的订单分散到一段时间内进行处理。虽然有些用户可能需要等待十几秒才能收到下单成功的操作,但这种体验仍然比无法下单要好得多。

2、应用解耦

以电商应用为例,应用中包括订单系统、库存系统、物流系统和支付系统。当用户创建订单后,如果直接调用库存系统、物流系统和支付系统,一旦其中任何一个子系统出现故障,都会导致下单操作异常。然而,当采用基于消息队列的方式后,系统间调用的问题会大大减少。例如,当物流系统发生故障并需要几分钟来修复时,物流系统需要处理的订单信息会被缓存在消息队列中。这样,用户的下单操作可以正常完成,而不受物流系统故障的影响。当物流系统恢复后,它只需继续处理订单信息即可。对于下单的用户来说,他们几乎感受不到物流系统的故障,从而提升了整个系统的可用性。

3、异步处理

有些服务之间的调用是异步的,例如A调用B,B需要花费很长时间执行。但是A需要知道B什么时候可以执行完。以前一般有两种常见的方式:一种是A过一段时间去调用B的查询API查询;另一种是A提供一个回调API,B执行完之后调用该API通知A服务。然而,这两种方式都不是很优雅。

使用消息总线可以很方便地解决这个问题。当A调用B服务后,只需要监听B处理完成的消息。一旦B处理完成,它会发送一条消息给消息队列(MQ)。然后,MQ会将此消息转发给A服务。这样,A服务既不需要循环调用B的查询API,也不需要提供回调API。同样地,B服务也不需要做这些操作。通过这种方式,A服务能够及时地得到异步处理成功的消息。


3、MQ 的分类

  1. ActiveMQ: 优点:单机吞吐量达到万级,时效性在毫秒级别,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据。 缺点:官方社区对ActiveMQ 5.x的维护越来越少,高吞吐量场景较少使用
  2. Kafka: 优点:性能卓越,单机写入吞吐量约在百万条/秒,最大的优点是吞吐量高。时效性在毫秒级别,可用性非常高,Kafka是分布式的,一个数据多个副本,少数机器宕机不会丢失数据,不会导致不可用。消费者采用Pull方式获取消息,消息有序,通过控制能够保证所有消息被消费且仅被消费一次。有优秀的第三方Kafka Web管理界面Kafka-Manager。在日志领域比较成熟,被多家公司和多个开源项目使用。功能支持较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用缺点:Kafka单机超过64个队列/分区时,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长。使用短轮询方式,实时性取决于轮询间隔时间。消费失败不支持重试。支持消息顺序,但是一台代理宕机后,就会产生消息乱序。社区更新较慢
  3. RocketMQ: 优点:单机吞吐量达到十万级,可用性非常高,分布式架构,消息可以做到0丢失。MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降。源码是Java,我们可以自己阅读源码,定制自己公司的MQ。 缺点:支持的客户端语言不多,目前是Java及C++,其中C++不成熟。社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码。
  4. RabbitMQ: 优点:由于Erlang语言的高并发特性,性能较好;吞吐量达到万级,MQ功能比较完备。健壮、稳定、易用、跨平台、支持多种语言如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。支持AJAX文档齐全;开源提供的管理界面非常棒,用起来很好用。社区活跃度高;更新频率相当高缺点:商业版需要收费,学习成本较高。

4、MQ 的选择

  1. ActiveMQ 是一个开源的消息队列系统,具有高可靠性和高性能的特点。它支持多种消息传递模式,包括点对点、发布/订阅和请求/响应等。ActiveMQ 提供了丰富的功能和灵活的配置选项,适用于各种企业级应用程序的开发和部署。
  2. RabbitMQ 是一个用 Erlang 语言编写的开源消息队列系统,具有高可靠性和可扩展性。它支持多种消息传递模式,包括点对点、发布/订阅和路由等。RabbitMQ 提供了丰富的插件和工具,可以方便地与其他系统集成,并提供了强大的管理和监控功能。
  3. Kafka 是一个分布式流处理平台,主要用于大规模数据流的处理和存储。它具有高吞吐量、低延迟和可扩展性的特点,适用于构建实时数据处理和分析系统。Kafka 支持多个消费者组和分区,可以实现负载均衡和容错机制。
  4. RocketMQ 是阿里巴巴开源的一款分布式消息中间件,具有高可靠性、高吞吐量和低延迟的特点。它支持多种消息传递模式,包括事务消息、定时消息和顺序消息等。RocketMQ 提供了丰富的配置选项和监控工具,适用于构建大规模的分布式系统。

5、RabbitMQ 的概念

在实际应用中,RabbitMQ扮演了一个可靠的通信中介的角色。可以把它想象成一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里。同理,当您需要发送一条消息时,你只需要将该消息发送到RabbitMQ服务器,RabbitMQ就会负责将这个消息路由到正确的目的地。

6、四大核心概念

生产者

产生数据发送消息的程序是生产者

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

7、RabbitMQ的六种工作模式

7.1、simple简单模式

1、 消息产生着将消息放入队列;

2、 消息的消费者(consumer)监听(while)消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端);

7.2、work工作模式(资源的竞争)

1、 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样)保证一条消息只能被一个消费者使用);

2、 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢);

7.3、publish/subscribe发布订阅(共享资源)

1、 X代表交换机rabbitMQ内部组件,erlang消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费;

2、 相关场景:邮件群发,群聊天,广播(广告);

7.4、routing路由模式

1、 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;;

2、 根据业务功能定义路由字符串;

3、 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;;

7.5、topic 主题模式(路由模式的一种)

1、 星号井号代表通配符; 2、 星号代表多个单词,井号代表一个单词; 3、 路由功能添加模糊匹配; 4、 消息产生者产生消息,把消息交给交换机; 5、 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费;

7.6、RPC 模式

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、 客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列;

2、 服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果;

3、 服务端将RPC方法的结果发送到RPC响应队列;

4、 客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果;

8、RabbitMQ工作原理

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

第二部分

这两个知识点可以查看主页其他两个文章

RbbitMQ 持久化和权重分配信息

RaabitMq死信队列

第三部分

1、导入依赖


  org.springframework.boot
  spring-boot-starter-amqp


2、simple简单模式

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

2.1、编写消费者

package com.littyxin;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// 生产端没有指定交换机只有routingKey和Object。
//消费方产生hello队列,放在默认的交换机(AMQP default)上。
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
//交换机的队列有相同的名字,他就会自动路由上。 
//生产端routingKey 叫hello ,消费端生产hello队列。
//他们就路由上了
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))  //表示RabbitMQ消费者,声明一个队列
public class HelloConsumer {

    @RabbitHandler //当消费者从队列取出消息时的回调方法
    public void receive(String message){
        System.out.println("message = " + message);
    }
}

2.2、编写生产者测试类

package com.littyxin;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //hello world
    @Test
    public void testHelloWorld(){
        //转换和发送    1.routingKey 2.消息
        rabbitTemplate.convertAndSend("hello","hello world");
    }
}

2.3、运行生产者测试类

3、work工作模式(资源的竞争)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

轮询分发消息

一个生产者发送消息,由多个工作线程(消费者)轮询接收

3.1、消费者

package com.littyxin;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkConsumer {

    //第一个消费者
    @RabbitListener(queuesToDeclare = @Queue("work")) //@RabbitListener在方法上代表它监听这个方法作为队列消费回调
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
    //第二个消费者
    @RabbitListener(queuesToDeclare = @Queue("work")) //@RabbitListener在方法上代表它监听这个方法作为队列消费回调
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

3.2、生产者

@Test
public void testWork(){
    for (int i = 0; i < 10; i++) {
        rabbitTemplate.convertAndSend("work","work模型");
    }
}

3.3、运行测试

4、fanout广播模型

publish/subscribe发布订阅(共享资源)


4.1、消费者

package com.littyxin;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, //创建临时队列
                    exchange =@Exchange(value = "logs",type = "fanout") //绑定的交换机
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

4.2、生产者

@Test
public void testFanout(){
    rabbitTemplate.convertAndSend("logs","","Fanout模型发送的消息");
}

4.3、运行测试

5、route路由模型

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会

5.1、消费者

package com.littyxin;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RouteConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),//指定交换机名称和类型
                    key = {"info","error","warn"}
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//临时队列
                    exchange = @Exchange(value = "directs",type = "direct"),//指定交换机名称和类型
                    key = {"error"}
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

5.2、生产者

@Test
public void testRoute(){
    rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");
}

5.3、运行测试

6、Topic动态路由模型

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符需要注意:

*(星号)可以代替一个单词

#(井号)可以替代零个或多个单词

Topic 匹配案例

下图绑定关系如下:

Q1-->绑定的是

  • 中间带 orange 带 3 个单词的字符串(.orange.)

Q2-->绑定的是

  • 最后一个单词是 rabbit 的 3 个单词(..rabbit)
  • 第一个单词是 lazy 的多个单词(lazy.#)

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

quick.orange.rabbit------->被队列 Q1Q2 接收到

lazy.orange.elephant ------->被队列 Q1Q2 接收到
quick.orange.fox ------->被队列 Q1 接收到
lazy.brown.fox ------->被队列 Q2 接收到
lazy.pink.rabbit-------> 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox ------->不匹配任何绑定不会被任何队列接收到会被丢弃

quick.orange.male.rabbit-------> 是四个单词不匹配任何绑定会被丢弃


lazy.orange.male.rabbit-------> 是四个单词但匹配 Q2

6.1、消费者

package com.littyxin;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//临时队列
                    exchange = @Exchange(type = "topic",name = "topics"),
                    key = {"user.save","user.*"}
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//临时队列
                    exchange = @Exchange(type = "topic",name = "topics"),
                    key = {"order.#","produce.#","user.*"}
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}

6.2、生产者

@Test
public void testTopic(){
    rabbitTemplate.convertAndSend("topics","user.save","user.save 路由消息");
}

6.3、运行测试


后边会有Rabbitmq和Springboot相关的文章,关注我!

点击这里复制本文地址 以上内容由goocz整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!

果子教程网 © All Rights Reserved.  蜀ICP备2024111239号-5