消息驱动Stream
视频教程中讲解的比较浅显,推荐一篇博文:https://blog.csdn.net/weixin_38399962/article/details/82192340
背景介绍
RabbitMQ、Kafka 这些中间件的差异导致我们实际项目开发会遇到一定的困扰,我们如果用了两个消息队列其中的一种,后续的业务需求,我们想往另外一种消息队列进行迁移,这无疑是灾难性的,一大堆东西都要重新推倒重新做,因为它和我们的系统耦合了,SpringCloud Stream提供了一种解耦的方式。
SpringCloud Stream简介
Spring Cloud Stream 是一个构建消息驱动微服务的框架。
Spring Cloud Stream构建在SpringBoot之上,提供了Kafka,RabbitMQ等消息中间件的个性化配置,引入了发布订阅、消费组和分区的语义概念,有效的简化了上层研发人员对MQ使用的复杂度,让开发人员更多的精力投入到核心业务的处理。
在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了哪个中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,使用Spring Cloud Stream来整合我们的消息中间件,可以降低系统和中间件的耦合性。
SpringCloud Stream作用
无感知的使用消息中间件
Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。
中间件和服务的高度解耦
Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
通过定义绑定器Binder作为中间层,实现了应用程序与消息和中间件细节之间的解耦。INPUT对应消费者、OUTPUT对象生产者。
应用模型
Spring Cloud Stream由一个中立的中间件内核组成。Spring Cloud Stream会注入输入和输出的channels,应用程序通过这些channels与外界通信,而channels则是通过一个明确的中间件Binder与外部brokers连接。
核心概念
- 发布/订阅
简单的讲就是一种生产者,消费者模式。发布者是生产,将输出发布到数据中心,订阅者是消费者,订阅自己感兴趣的数据。当有数据到达数据中心时,就把数据发送给对应的订阅者。
- 消费组
直观的理解就是一群消费者一起处理消息。需要注意的是:每个发送到消费组的数据,仅由消费组中的一个消费者处理。
- 分区
类比于消费组,分区是将数据分区。举例:某应用有多个实例,都绑定到同一个数据中心,也就是不同实例都将数据发布到同一个数据中心。分区就是将数据中心的数据再细分成不同的区。为什么需要分区?因为即使是同一个应用,不同实例发布的数据类型可能不同,也希望这些数据由不同的消费者处理。这就需要,消费者可以仅订阅一个数据中心的部分数据。这就需要分区这个东西了。
流程说明
- binder:很方便的连接中间件;屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。通过Channel对队列进行配置。
- Source&Sink:参照对象是SpringCloud Stream,对Stream发布消息就是输出,从Stream接受消息就是输入。
操作案例
创建消费生产者8801
创建Maven项目
创建名为:cloud-stream-mq-provider-8801的maven项目
修改pom.xml
<artifactId>cloud-stream-mq-provider-8801</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
创建application.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
output:
destination: studyExchange
content-type: application/json
# binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
instance:
instance-id: cloud-stream-provider-8801
prefer-ip-address: true
编写启动类
@SpringBootApplication
public class CloudStreamProvider8801Application {
public static void main(String[] args) {
SpringApplication.run(CloudStreamProvider8801Application.class,args);
}
}
编写业务类
-
service 接口
public interface MessageProvider { public String send(); }
-
service 实现类
@EnableBinding(Source.class) public class MessageProviderImpl implements MessageProvider { @Resource private MessageChannel output; //消息发送管道 @Override public String send() { String randomStr = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(randomStr).build()); System.out.println("**********************Str:"+randomStr); return randomStr; } }
-
controller 调用消息发送
@RestController @RequestMapping("/stream") public class SendMessageController { @Resource MessageProvider messageProvider; @RequestMapping("/send") public String send(){ return messageProvider.send(); } }
测试
访问:http://localhost:15672/#/
然后浏览器刷新:http://localhost:8801/stream/send 可以得到如下图所示的监控界面。
创建消费消费者8802
创建Maven项目
创建名为:cloud-stream-mq-consumer-8802 的项目
修改POM.XML
<artifactId>cloud-stream-mq-consumer-8802</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
创建application.yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
input: #改成消费者通道
destination: studyExchange
content-type: application/json
# binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
instance:
instance-id: cloud-stream-provider-8801
prefer-ip-address: true
创建启动类
@SpringBootApplication
public class CloudConsumer8802Application {
public static void main(String[] args) {
SpringApplication.run(CloudConsumer8802Application.class,args);
}
}
创建业务类
创建消费者消费业务类
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void input(Message <String> message){
System.out.println("消费者["+port+"]接收到的消息为:"+message.getPayload());
}
}
测试
访问:http://localhost:8801/stream/send
观察消费者控制台打印信息:
查看RabbitMQ后台监控界面:
创建消费消费者8803
创建maven项目
创建名为:cloud-stream-mq-consumer-8803 的maven项目
修改pom.xml文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
创建application.yml
server:
port: 8803
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
input:
destination: studyExchange
content-type: application/json
# binder: defaultRabbit
# group: consumerGroupOne
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
instance:
instance-id: cloud-stream-consumer-8803
prefer-ip-address: true
创建启动类
@SpringBootApplication
public class CloudConsumer8803Application {
public static void main(String[] args) {
SpringApplication.run(CloudConsumer8803Application.class,args);
}
}
创建业务类
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void input(Message <String> message){
System.out.println("消费者["+port+"]接收到的消息为:"+message.getPayload());
}
}
测试
启动后;访问http://localhost:8801/stream/send 可以在8802&8803后台都可以看到接收到的消息。
使用分组解决重复消费问题
在Stream中同一个组(Group)中的多个消费者是竞争关系,就能确保消息只会被其中一个应用消费一次。不同组之间消息可以重复消费。
上图中的bindings对应的id和8802&8803启动时大约的日志对应。
为了解决重复消费的问题;我们使用一个配置来解决。
如何配置?
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
bindings:
input:
destination: studyExchange
content-type: application/json
group: consumerGroupOne #为8802 & 8803 设置同一个group
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
instance:
instance-id: cloud-stream-consumer-8802
prefer-ip-address: true
测试
再次访问http://localhost:8801/stream/send 发送10次消息;会看到被分散到8802&8803中去。
如何持久化
设置group属性后便可以支持持久化;
测试
注销掉8802项目 application.yml 中 group属性
- 启动7001 & 7002
- 启动8801,访问5次http://localhost:8801/stream/send
- 启动8802 查看控制台并未打印消息;
- 启动8803 查看控制台打印出发送的消息。
本文由 huzd 创作,采用 知识共享署名4.0 国际许可协议进行许可本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名最后编辑时间
为:
2021/03/30 00:46