Spring Cloud Stream使用详解及部分重点源码分析

Spring Cloud Stream使用详解及部分重点源码分析

解决方案goocz2025-02-01 11:35:2238A+A-

环境:Springboot2.3.12.RELEASE + Spring Cloud Hoxton.SR12 + RabbitMQ3.8.12


简介

Spring Cloud Stream是一个框架,用于构建与MQ连接的高度可伸缩的事件驱动微服务。其目的是为了简化消息在 Spring Cloud 应用程序中的开发。屏蔽了各种MQ之间的差异,使得在更换MQ的时候不需要修改代码。

Spring Cloud Stream支持多种绑定器实现,如下:

  • RabbitMQ
  • Apache Kafka
  • Kafka Streams
  • Amazon Kinesis
  • Google PubSub (partner maintained)
  • Solace PubSub+ (partner maintained)
  • Azure Event Hubs (partner maintained)
  • AWS SQS (partner maintained)
  • AWS SNS (partner maintained)
  • Apache RocketMQ (partner maintained)

详细查看官方文档,对应每一个MQ都有一个Github地址。

Spring Cloud Stream的核心构建块是:

  • 目标绑定器(Destination Binders):负责与MQ集成的组件。
  • 目标绑定(Destination Bindings):MQ中间件与最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
  • 消息(Message):生产者和消费者用来与目标绑定器(以及通过MQ与其他应用程序)通信的规范数据结构。

快速入门

依赖:

<properties>
  <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
  </dependency>
</dependencies>
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

应用配置:

spring:
  rabbitmq:
    host: localhost
    virtual-host: bus
    port: 5672
    username: xxx
    password: xxx
---
spring:
  cloud:
    stream:
      bindings:
        #自定义输入输出
        myInput:
          #指定输入通道对应的主题名
          destination: demo
        myOutput:
          destination: demo

创建消息通道绑定的接口:

public interface StreamBinding {
 
  String INPUT = "myInput";
  String OUTPUT = "myOutput";
 
  @Input(StreamBinding.INPUT)
  SubscribableChannel input();
 
  @Output(StreamBinding.OUTPUT)
  MessageChannel output();
}

通过 @Input和 @Output注解定义输入通道和输出通道名称,这里的名称与上面配置文件中的是对应的。

当定义输出通道的时候,需要返回 MessageChannel 接口对象,该接口定义了向消息通道发送消息的方法;定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法。

这里的Input,Output两个方法容器会分别创建一个Bean对象

创建消费者:

@Component
@EnableBinding(value = {StreamBinding.class})
public class StreamReceiver {
 
  private Logger logger = LoggerFactory.getLogger(StreamReceiver.class);

  @StreamListener(StreamBinding.INPUT)
  public void receive(String message) {
    logger.info("接收到消息: {}", message);
  }
}

@EnableBinding 注解用来指定一个或多个定义了 @Input 或 @Output 注解的接口,以此实现对消息通道(Channel)的绑定。上面我们通过 @EnableBinding(value = {StreamClient.class}) 绑定了 StreamClient 接口,该接口是我们自己实现的对输入输出消息通道绑定的定义

@StreamListener,主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。上面我们将 receive 方法注册为 myInput 消息通道的监听处理器,当我们往这个消息通道发送信息的时候,receiver 方法会执行。

消息发送接口:

@Resource
private StreamBinding streamBinding;
@GetMapping("/send")
public void send() {
  streamBinding.output().send(MessageBuilder.withPayload("First Message...").build());
}

启动服务:

查看RabbitMQ

自动为我们创建了一个队列,队列的名称是以我们在配置文件中配置的开头,后面是随机生成的。这个队列会自动删除AD,服务关闭后就自动删除队列;Excl:排他的,存在该队列就不会在创建了。

修改端口后,再启动一个服务:

创建了2个队列,使用其中一个发送消息:

两个服务都收到了消息。

消费者组

上面启动了2个服务都能收到消息,在集群的环境下这样肯定会带来问题,如果是业务方面的就会出现重复数据,这时候我们可以通过设置分组的解决此问题。修改配置:

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定输入通道对应的主题名
          destination: demo
          #指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
          #多个实例会轮询的接收消息
          group: g_test
        myOutput:
          destination: demo

再次启动服务后,两个服务会轮询的接收到消息。

启动服务后,两个服务都同时监听同一个队列。队列也不是随机生成的了,并且队列是持久化的,服务断开后队列也不会自动删除。

消息分区

通过消费组的设置,虽然能保证同一消息只被一个消费者进行接收和处理,但是对于特殊业务情况,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能被同一个实例消费,这个就可以使用 Spring Cloud Stream 提供的消息分区功能。修改配置

spring:
  cloud:
    stream:
      bindings:
        myInput:
          #指定输入通道对应的主题名
          destination: demo
          #指定一个组;指定分组以后,不管你启动多少个实例,所有的实例都监听这一个队列
          #多个实例会轮询的接收消息
          group: g_test
          consumer:
            #通过该参数开启消费者分区功能
            partitioned: true
        myOutput:
          destination: demo
          producer:
            #这里的配置也可以是SpEL表达式,比如:headers['partition']通过消息header获取属性
            #这里会通过表达式及消息对象进行计算得到一个Key,然后获取key的hashCode
            # 得到hashCode以后会与partitionCount进行取模运算得到具体的分区
            partitionKeyExpression: '1' #我这里给的值就是对应的instanceIndex的值,你希望谁接收就设置谁配置的值即可
            partitionCount: 2
      #实例总数
      instanceCount: 2
      #该参数设置了当前实例的索引号,从 0 开始
      instanceIndex: 0

计算分区源码:

最后得到分区信息后会在消息头中放入一个scst_partition为key,partition为值的头信息。

启动多个实例后,测试发现所有的消息都只是同一个实例收到消息

交换机分别与每一个服务进行绑定使用不同的Routing Key这样在发送消息的时候就可以根据计算处理的分区进行定向发送消息了。

通过源码查看:

这里通过我们的配置交换机为demo。接着是获取路由key了

这里会从消息header中获取key = scst_partition的头信息。

这样针对使用RabbitMQ的中间件发送消息所需要的交换机及路由key就确定下来了。

完毕!!!

在Spring Cloud 中你还在使用Ribbon快来试试Load-Balancer

Spring Cloud Gateway应用详解1之谓词

Spring Cloud Bus使用说明详解

SpringCloud Nacos 整合feign

SpringCloud Alibaba 之 Nacos 服务

Spring Cloud Sentinel整合Feign

SpringCloud Hystrix实现资源隔离应用

SpringCloud zuul 动态网关配置

Spring Cloud 微服务日志收集管理Elastic Stack完整详细版

SpringCloud Feign实现原理源分析

Spring Cloud Sentinel 热点参数限流

点击这里复制本文地址 以上内容由goocz整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!

果子教程网 © All Rights Reserved.  蜀ICP备2024111239号-5