Kevin's blog Kevin's blog
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档

Kevin

你可以迷茫,但不可以虚度
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档
  • 手册教程

  • 技术应用

    • 工作杂货

      • Jprofiler排查
      • ES查询压测
      • 阿尔萨斯(Arthas)
      • 定时任务
      • DruidDataSource配置
      • Spring Cloud Stream
        • 1 简介
          • 1.1 概述
        • 2 快速搭建
          • 2.1 引入依赖
        • 3 开发指南
          • 3.1 apollo 增加配置stream.yml
          • 3.2: SpringBoot项目启动类,添加注解SpringBootApplication,EnableApolloConfig
          • 3.3: 创建消息通道绑定的接口
          • 3.4 发送消息
          • 3.5 接收消息
          • 3.6 接口测试
      • 线上Tomcat配置参考
      • 配置Prometheus及健康检测
      • Feign远程调用
      • Hystrix单方法熔断配置
      • 本地开发联调配置
      • Java代码杂记
      • SQL脚本杂记
      • 批量算费本地工具类
      • Apollo配置模糊查询
      • 开发问题记录
      • 机器配置参考
    • 技巧备忘

  • 流程规范

  • GitHub技巧

  • VPN

  • Git笔记

  • 实用手册
  • 技术应用
  • 工作杂货
luoxiaofeng
2022-05-05
目录

Spring Cloud Stream

# 1 简介

# 1.1 概述

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
通过使用 Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

提示

目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka 的自动化配置。

# 2 快速搭建

我们通过一个简单的示例对 Spring Cloud Stream 有一个初步的认识。中间件使用 RabbitMQ,创建 spring-cloud-stream 模块。

# 2.1 引入依赖

编辑 pom.xml 文件,引入 Spring Cloud Stream 对 RabbitMQ 支持的 spring-cloud-starter-stream-rabbit 依赖,该依赖包是 Spring Cloud Stream 对 RabbitMQ 支持的封装,其中包含了对 RabbitMQ 的自动化配置等内容。

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
1
2
3
4
5
6
7
8
9
10
11
12

# 3 开发指南

# 3.1 apollo 增加配置stream.yml

spring:
  cloud:
    stream:
      bindings:
       #输入通道名称,对应java代码InputInterface定义的名称
        rabbit-mq-demo-test-input:
          #通道主题名
          destination: rabbit-mq-demo-test
          contentType: application/json
          #消费组名称, 多节点消费保证唯一
          group: rabbit-mq-demo-test
          #绑定的QM配置
          binder: rabbit-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6

        rabbit-mq-demo-test-output:
          destination: rabbit-mq-demo-test
          contentType: application/json
          group: 
          binder: rabbit-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6
            
        demo-test-input:
          destination: demo-test
          contentType: application/json
          group: demo-test
          binder: kafka-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6
            
        demo-test-output:
          destination: demo-test
          contentType: application/json
          group: demo-test
          binder: kafka-test
          consumer:
            enable-auto-commit: false
            auto-offset-reset: earliest
            max-poll-records: 1000
            concurrency: 6
      #默认配置, 当存在多个配置事必须配置, 否则获取不到MQ配置
      default-binder: rabbit-test    
      binders:
      #RabbitMQ配置
        rabbit-test:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                username: guest
                password: guest
                virtual-host: /
        #Kafka配置     
        kafka-test:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092
                      auto-add-partitions: true
                      auto-create-topics: true
                      min-partition-count: 1
                      replication-factor: 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

# 3.2: SpringBoot项目启动类,添加注解SpringBootApplication,EnableApolloConfig

@EnableApolloConfig({"stream.yml"})
@SpringBootApplication(exclude = {RabbitAutoConfiguration.class}) 
public class PlatformDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(PlatformDemoApplication.class, args);
    }

}
1
2
3
4
5
6
7
8
9

屏蔽Rabbit org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect 错误

# 3.3: 创建消息通道绑定的接口

创建 InputInterface 接口,通过 @Input 注解定义输入通道和输出通道,另外,@Input 注解都还有一个 value 属性,该属性可以用来设置消息通道的名称,

RabbitMQ: rabbit-mq-demo-test-input

Kafka: demo-test-input

@Component
public interface InputInterface {

    //RabbitMQ接收者通道
    String RABBIT_MQ_DEMO_TEST_INPUT = "rabbit-mq-demo-test-input";

    //Kafka接收者通道
    String KAFKA_DEMO_TEST_INPUT = "demo-test-input";


    @Input(RABBIT_MQ_DEMO_TEST_INPUT)
    SubscribableChannel rabbitMQInput();

    @Input(KAFKA_DEMO_TEST_INPUT)
    SubscribableChannel kafkaSendInput();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

创建 OutputInterface接口,通过@Output 注解定义输入通道和输出通道,另外@Output 注解都还有一个 value 属性,该属性可以用来设置消息通道的名称,

RabbitMQ: rabbit-mq-demo-test-output

Kafka: demo-test-output

@Component
public interface OutputInterface {

    //RabbitMQ接收者通道
    String RABBIT_MQ_DEMO_TEST_INPUT = "rabbit-mq-demo-test-output";

    //Kafka接收者通道
    String KAFKA_DEMO_TEST_INPUT = "demo-test-output";


    @Output(RABBIT_MQ_DEMO_TEST_INPUT)
    SubscribableChannel rabbitMQSendMessage();

    @Output(KAFKA_DEMO_TEST_INPUT)
    SubscribableChannel kafkaSendMessage();

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 3.4 发送消息

创建测试消息实体MessageDTO

@Data
public class MessageDTO {

    /**
     * ID
     */
    private Integer id;
    /**
     * 编码
     */
    private String code;
    /**
     * 名称
     */
    private String name;
    /**
     * 模块名称
     */
    private String module;
    /**
     * 操作类型
     */
    private String operation;
    /**
     * 冗余字段
     */
    private String json;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

创建MqMessageService接口以及MqServiceImpl实现类

public interface MqMessageService {

    boolean sendRabbitMqMessage(MessageDTO dto);

    boolean sendKafkaMessage(MessageDTO dto);

}
@Slf4j
@Service
@EnableBinding(value = {OutputInterface.class})
public class MqMessageServiceImpl implements MqMessageService {

    @Autowired
    private OutputInterface outputInterface;

    @Override
    public boolean sendRabbitMqMessage(MessageDTO dto) {
        Message message = MessageBuilder.withPayload(dto).build();
        return outputInterface.rabbitMQSendMessage().send(message);
    }

    @Override
    public boolean sendKafkaMessage(MessageDTO dto) {
        Message message = MessageBuilder.withPayload(dto).build();
        return outputInterface.kafkaSendMessage().send(message);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 3.5 接收消息

创建监听器InputStreamListener

@Slf4j
@EnableBinding(InputInterface.class)
@Component
public class InputStreamListener {

    @StreamListener(value = InputInterface.RABBIT_MQ_DEMO_TEST_INPUT)
    public void showRabbitMQMessage(@Payload MessageDTO dto) {
        log.info("showRabbitMQMessage message :[{}]", dto);

    }
    
    @StreamListener(value = InputInterface.KAFKA_DEMO_TEST_INPUT)
    public void showKafkaMessage(@Payload MessageDTO dto) {
        log.info("showKafkaMessage message:{}", dto);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 3.6 接口测试

1.创建MQController

@Api(value = "MQController", tags = "MQ测试")
@RequestMapping("/mqDemo")
@RestController
@Slf4j
public class MQController {

    @Autowired
    private MqMessageService mqMessageService;


    @ApiOperation(value = "发送RabbitMQ消息", notes = "发送RabbitMQ消息")
    @PostMapping("sendRabbitMqMessage")
    public Result<Boolean> sendRabbitMqMessage(@Valid @RequestBody MessageDTO messageDTO) {
        return Result.success(mqMessageService.sendRabbitMqMessage(messageDTO));
    }

    @ApiOperation(value = "发送Kafka消息", notes = "发送Kafka消息")
    @PostMapping("sendKafkaMessage")
    public Result<Boolean> sendKafkaMessage(@Valid @RequestBody MessageDTO messageDTO) {
        return Result.success(mqMessageService.sendKafkaMessage(messageDTO));
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

2.Postman测试

RabbitMq

发送接口:http://localhost:8080/demoapi/mqDemo/sendRabbitMqMessage

测试数据

{
    "id":1,
    "code":"code",
    "name":"name",
    "module":"module",
    "operation":"INSERT",
    "json":"发送RabbitMQ数据"

}
1
2
3
4
5
6
7
8
9

监听器接收RabbitMQ数据

c.y.p.d.m.l.InputStreamListener - showRabbitMQMessage message :[MessageDTO(id=1, code=code, name=name, module=module, operation=INSERT, json=发送RabbitMQ数据)]
1

Kafka

发送接口:http://localhost:8080/demoapi/mqDemo/sendKafkaMessage

{
    "id":2,
    "code":"code",
    "name":"name",
    "module":"module",
    "operation":"INSERT",
    "json":"发送Kafka数据"

}
1
2
3
4
5
6
7
8
9

监听器接收Kafka数据

INFO  c.y.p.d.m.l.InputStreamListener - showKafkaMessage message:MessageDTO(id=2, code=code, name=name, module=module, operation=INSERT, json=发送Kafka数据)
1
#Spring Cloud
上次更新: 2022/10/24, 18:37:43
DruidDataSource配置
线上Tomcat配置参考

← DruidDataSource配置 线上Tomcat配置参考→

最近更新
01
AI是如何学习的
06-05
02
chatGpt提示原则
06-05
03
提示词工程实践指南
06-05
更多文章>
| Copyright © 2022-2025 Kevin | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式