游侠的博客 游侠的博客
首页
  • 论文笔记
  • 一些小知识点

    • pytorch、numpy、pandas函数简易解释
  • 《深度学习500问》
开发
技术
更多
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Ranger

一名在校研究生
首页
  • 论文笔记
  • 一些小知识点

    • pytorch、numpy、pandas函数简易解释
  • 《深度学习500问》
开发
技术
更多
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Vue

  • SpringBoot2

  • JavaWeb

  • SSM

  • SpringBoot3

    • 一、SpringBoot3快速入门
    • 二、SpringBoot核心机制了解
    • 三、SpringBoot的核心技能
    • 四、WebMvcAutoConfiguration原理
    • 五、SpringBootWeb开发-Web场景
    • 六、SpringBootWeb开发-静态资源与路径匹配
    • 七、SpringBootWeb开发-内容协商
    • 八、SpringBootWeb开发-模板引擎
    • 九、SpringBootWeb开发-国际化与错误处理
    • 十、SpringBootWeb开发-嵌入式容器
    • 十一、SpringBootWeb开发-SpringMVC所有常用特性配置
    • 十二、SpringBootWeb开发-Web新特性
    • 十三、SpringBoot-整合SSM场景
    • 十四、SpringBoot-基础特性
    • 十五、SpringBoot-核心原理
    • 十六、SpringBoot场景整合-环境准备
    • 十七、SpringBoot场景整合-Redis
    • 十八、SpringBoot场景整合-接口文档
    • 十九、SpringBoot场景整合-远程调用
    • 二十、SpringBoot场景整合-消息服务
      • 消息队列-场景
        • 1. 异步
        • 2. 解耦
        • 3. 削峰
        • 4. 缓冲
      • 消息队列-Kafka
        • 1. 消息模式
        • 2. Kafka工作原理
        • 3. SpringBoot整合
        • 4. 消息发送
        • 5. 消息监听
        • 6. 参数配置
        • 7. 自动配置原理
    • 二十一、SpringBoot场景整合-Web安全
    • 二十二、SpringBoot场景整合-可观测性
    • 二十三、SpringBoot场景整合-AOT
  • 技术
  • SpringBoot3
yangzhixuan
2023-06-16
目录

二十、SpringBoot场景整合-消息服务

https://kafka.apache.org/documentation/

# 消息队列-场景

# 1. 异步

如下图所示,使用异步能够提升效率,但和之前的事件机制一样,如果不引入消息,那么后续增加业务需求需要修改原来的代码,但加入消息机制后,我们只需要在新的业务上添加监听消息的代码就能很好的引入。(对增加开启,对修改关闭)

image

# 2. 解耦

如果库存系统的接口或其他进行了一定的更改,那么我们的订单系统也需要进行更改,而使用消息监听就不会有这种问题

image

# 3. 削峰

将请求放入消息队列中排队,业务处理服务器根据自身能力一个个处理,而不是一股脑全将请求交给服务器处理,避免服务器宕机

image

# 4. 缓冲

将所有的日志文件放进消息队列中做缓存,日志分析处理模块有多大的处理能力就处理多少,处理完一批再进行下一批

image

# 消息队列-Kafka

Kafka是一个消息队列服务器,获取消息叫做订阅或监听

消息的发送者叫做生产者,消息的接受者叫做消费者

# 1. 消息模式

  • 点对点模式:

这种模式生产者和消费者都只有一个,生产者发送什么消息,消费者就会接收什么消息,且在确认接受到消息之后会向消息队列发送一个确认信息,让消息队列删除该消息,一条条消息接收,直到消息被清空

  • 发布订阅模式:

消息分为不同的主题,会有很多消费者订阅不同的主题,消息队列会将主题里的消息广播给订阅了的消费者,且队列里的消息不清除。如何知道每个消费者此时处理到哪个消息呢?会为每个消费者保存一个偏移量来记录这个信息。

image

# 2. Kafka工作原理

  • Broker:服务器结点

  • Partitions:分区,将数据分散存储

  • Replicas:副本,在其他机器上进行备份

一份大小为100T的数据被分成了三份,每一份有两个副本,存储情况如下

消费者组:

  • 同一个组里的消费者是竞争关系,如图中示例,消费者1、2、3分别消费1分区、2分区、3分区的消息,如果此时在该消费者组中引入第四个消费者,那么这个消费者哪个分区的消息都不能消费
  • 不同组之间的消费者就是发布订阅模式

image

# 3. SpringBoot整合

参照:https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
1
2
3
4

配置

spring.kafka.bootstrap-servers=172.20.128.1:9092
1
/**
 * KafkaAutoConfiguration提供如下功能
 * 1、KafkaProperties:kafka的所有配置; 以 spring.kafka开始
 *    - bootstrapServers: kafka集群的所有服务器地址
 *    - properties: 参数设置
 *    - consumer: 消费者
 *    - producer: 生产者
 *    ...
 * 2、@EnableKafka: 开启Kafka的注解驱动功能
 * 3、KafkaTemplate: 收发消息
 * 4、KafkaAdmin: 维护主题等...
 * 5、@EnableKafka +  @KafkaListener 接受消息
 *    1)消费者来接受消息,需要有group-id
 *    2)收消息使用 @KafkaListener + ConsumerRecord(保存封装的消息,可以取出key、value)
 *    3)spring.kafka 开始的所有配置
 * 6、核心概念
 *    分区:  分散存储,1T的数据分散到N个节点
 *    副本:  备份机制,每个小分区的数据都有备份
 *    主题: topics; 消息是发送给某个主题
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

修改C:\Windows\System32\drivers\etc\hosts文件,配置8.130.32.70 kafka,因为服务器那边是有kafka这个域名对应着结点的。

# 4. 消息发送

@SpringBootTest
class Boot07KafkaApplicationTests {

    @Autowired
    KafkaTemplate kafkaTemplate;
    @Test
    void contextLoads() throws ExecutionException, InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        CompletableFuture[] futures = new CompletableFuture[10000];
        for (int i = 0; i < 10000; i++) {
            CompletableFuture send = kafkaTemplate.send("order", "order.create."+i, "订单创建了:"+i);
            futures[i]=send;
        }
        CompletableFuture.allOf(futures).join();
        watch.stop();
        System.out.println("总耗时:"+watch.getTotalTimeMillis());
    }

}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 5. 消息监听

@Component
public class OrderMsgListener {

    @KafkaListener(topics = "order",groupId = "order-service")
    public void listen(ConsumerRecord record){
        System.out.println("收到消息:"+record); //可以监听到发给kafka的新消息,以前的拿不到
    }

    @KafkaListener(groupId = "order-service-2",topicPartitions = {
            @TopicPartition(topic = "order",partitionOffsets = {
                    @PartitionOffset(partition = "0",initialOffset = "0")
            })
    })
    public void listenAll(ConsumerRecord record){
        System.out.println("收到partion-0消息:"+record);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 6. 参数配置

消费者

# 配置序列化器,将对象转json,下面这个是值的序列化器配置,另外还有键的序列化器
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
1
2
3
4

生产者

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
1
2

# 7. 自动配置原理

kafka 自动配置在KafkaAutoConfiguration

  1. 容器中放了 KafkaTemplate 可以进行消息收发

  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等

        @Bean
        public NewTopic topic1() {
            return TopicBuilder.name("thing")
                    .partitions(1)
                    .compact()
                    .build();
        }
    
    1
    2
    3
    4
    5
    6
    7
  3. kafka 的配置在KafkaProperties中

  4. @EnableKafka可以开启基于注解的模式

编辑 (opens new window)
上次更新: 2024/05/30, 07:49:34
十九、SpringBoot场景整合-远程调用
二十一、SpringBoot场景整合-Web安全

← 十九、SpringBoot场景整合-远程调用 二十一、SpringBoot场景整合-Web安全→

最近更新
01
tensor比较大小函数
05-30
02
Large Language Models can Deliver Accurate and Interpretable Time Series Anomaly Detection
05-27
03
半监督学习经典方法 Π-model、Mean Teacher
04-10
更多文章>
Theme by Vdoing | Copyright © 2023-2024 Ranger | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式