【SpringCloud学习笔记】消息驱动Stream

/ 微服务 / 没有评论 / 2080浏览

消息驱动Stream

视频教程中讲解的比较浅显,推荐一篇博文:https://blog.csdn.net/weixin_38399962/article/details/82192340

背景介绍

RabbitMQ、Kafka 这些中间件的差异导致我们实际项目开发会遇到一定的困扰,我们如果用了两个消息队列其中的一种,后续的业务需求,我们想往另外一种消息队列进行迁移,这无疑是灾难性的,一大堆东西都要重新推倒重新做,因为它和我们的系统耦合了,SpringCloud Stream提供了一种解耦的方式。

SpringCloud Stream简介

Spring Cloud Stream 是一个构建消息驱动微服务的框架。

Spring Cloud Stream构建在SpringBoot之上,提供了Kafka,RabbitMQ等消息中间件的个性化配置,引入了发布订阅、消费组和分区的语义概念,有效的简化了上层研发人员对MQ使用的复杂度,让开发人员更多的精力投入到核心业务的处理。

在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,使用Spring Cloud Stream来整合我们的消息中间件,可以降低系统和中间件的耦合性。

SpringCloud Stream作用

无感知的使用消息中间件

Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。

中间件和服务的高度解耦

Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

通过定义绑定器Binder作为中间层,实现了应用程序与消息和中间件细节之间的解耦。INPUT对应消费者、OUTPUT对象生产者。

image-20210327173715491

应用模型

img

Spring Cloud Stream由一个中立的中间件内核组成。Spring Cloud Stream会注入输入和输出的channels,应用程序通过这些channels与外界通信,而channels则是通过一个明确的中间件Binder与外部brokers连接。

核心概念

  1. 发布/订阅

简单的讲就是一种生产者,消费者模式。发布者是生产,将输出发布到数据中心,订阅者是消费者,订阅自己感兴趣的数据。当有数据到达数据中心时,就把数据发送给对应的订阅者。

  1. 消费组

直观的理解就是一群消费者一起处理消息。需要注意的是:每个发送到消费组的数据,仅由消费组中的一个消费者处理。

  1. 分区

类比于消费组,分区是将数据分区。举例:某应用有多个实例,都绑定到同一个数据中心,也就是不同实例都将数据发布到同一个数据中心。分区就是将数据中心的数据再细分成不同的区。为什么需要分区?因为即使是同一个应用,不同实例发布的数据类型可能不同,也希望这些数据由不同的消费者处理。这就需要,消费者可以仅订阅一个数据中心的部分数据。这就需要分区这个东西了。

流程说明

image-20210327174043793

操作案例

创建消费生产者8801

创建Maven项目

创建名为:cloud-stream-mq-provider-8801的maven项目

修改pom.xml
		<artifactId>cloud-stream-mq-provider-8801</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
创建application.yml
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
      bindings:
        output:
          destination: studyExchange
          content-type: application/json
#          binder: defaultRabbit

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
  instance:
    instance-id: cloud-stream-provider-8801
    prefer-ip-address: true
编写启动类
@SpringBootApplication
public class CloudStreamProvider8801Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamProvider8801Application.class,args);
    }
}
编写业务类
测试

访问:http://localhost:15672/#/

然后浏览器刷新:http://localhost:8801/stream/send 可以得到如下图所示的监控界面。

image-20210327190154298

创建消费消费者8802

创建Maven项目

创建名为:cloud-stream-mq-consumer-8802 的项目

修改POM.XML
    <artifactId>cloud-stream-mq-consumer-8802</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
创建application.yml
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:  #改成消费者通道
          destination: studyExchange
          content-type: application/json
#          binder: defaultRabbit

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
  instance:
    instance-id: cloud-stream-provider-8801
    prefer-ip-address: true
创建启动类
@SpringBootApplication
public class CloudConsumer8802Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudConsumer8802Application.class,args);
    }
}
创建业务类

创建消费者消费业务类

@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
    @Value("${server.port}")
    private String port;

    @StreamListener(Sink.INPUT)
    public void input(Message <String> message){
        System.out.println("消费者["+port+"]接收到的消息为:"+message.getPayload());
    }
}

测试

访问:http://localhost:8801/stream/send

观察消费者控制台打印信息:

image-20210327193110143

查看RabbitMQ后台监控界面:

image-20210327192420599

创建消费消费者8803

创建maven项目

创建名为:cloud-stream-mq-consumer-8803 的maven项目

修改pom.xml文件
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

创建application.yml
server:
  port: 8803

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
#          binder: defaultRabbit
#          group: consumerGroupOne

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
  instance:
    instance-id: cloud-stream-consumer-8803
    prefer-ip-address: true
创建启动类
@SpringBootApplication
public class CloudConsumer8803Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudConsumer8803Application.class,args);
    }
}
创建业务类
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
    @Value("${server.port}")
    private String port;

    @StreamListener(Sink.INPUT)
    public void input(Message <String> message){
        System.out.println("消费者["+port+"]接收到的消息为:"+message.getPayload());
    }
}
测试

启动后;访问http://localhost:8801/stream/send 可以在8802&8803后台都可以看到接收到的消息。

使用分组解决重复消费问题

在Stream中同一个组(Group)中的多个消费者是竞争关系,就能确保消息只会被其中一个应用消费一次。不同组之间消息可以重复消费。

image-20210329170833496

上图中的bindings对应的id和8802&8803启动时大约的日志对应。

image-20210329170858046 image-20210329170919048

为了解决重复消费的问题;我们使用一个配置来解决。

如何配置?

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          group: consumerGroupOne #为8802 & 8803 设置同一个group

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
  instance:
    instance-id: cloud-stream-consumer-8802
    prefer-ip-address: true

测试

再次访问http://localhost:8801/stream/send 发送10次消息;会看到被分散到8802&8803中去。

如何持久化

设置group属性后便可以支持持久化;

测试

注销掉8802项目 application.yml 中 group属性

  1. 启动7001 & 7002
  2. 启动8801,访问5次http://localhost:8801/stream/send
  3. 启动8802 查看控制台并未打印消息;
  4. 启动8803 查看控制台打印出发送的消息。