二十、SpringBoot场景整合-消息服务
https://kafka.apache.org/documentation/
# 消息队列-场景
# 1. 异步
如下图所示,使用异步能够提升效率,但和之前的事件机制一样,如果不引入消息,那么后续增加业务需求需要修改原来的代码,但加入消息机制后,我们只需要在新的业务上添加监听消息的代码就能很好的引入。(对增加开启,对修改关闭)
# 2. 解耦
如果库存系统的接口或其他进行了一定的更改,那么我们的订单系统也需要进行更改,而使用消息监听就不会有这种问题
# 3. 削峰
将请求放入消息队列中排队,业务处理服务器根据自身能力一个个处理,而不是一股脑全将请求交给服务器处理,避免服务器宕机
# 4. 缓冲
将所有的日志文件放进消息队列中做缓存,日志分析处理模块有多大的处理能力就处理多少,处理完一批再进行下一批
# 消息队列-Kafka
Kafka是一个消息队列服务器,获取消息叫做订阅或监听
消息的发送者叫做生产者,消息的接受者叫做消费者
# 1. 消息模式
- 点对点模式:
这种模式生产者和消费者都只有一个,生产者发送什么消息,消费者就会接收什么消息,且在确认接受到消息之后会向消息队列发送一个确认信息,让消息队列删除该消息,一条条消息接收,直到消息被清空
- 发布订阅模式:
消息分为不同的主题,会有很多消费者订阅不同的主题,消息队列会将主题里的消息广播给订阅了的消费者,且队列里的消息不清除。如何知道每个消费者此时处理到哪个消息呢?会为每个消费者保存一个偏移量来记录这个信息。
# 2. Kafka工作原理
Broker:服务器结点
Partitions:分区,将数据分散存储
Replicas:副本,在其他机器上进行备份
一份大小为100T的数据被分成了三份,每一份有两个副本,存储情况如下
消费者组:
- 同一个组里的消费者是竞争关系,如图中示例,消费者1、2、3分别消费1分区、2分区、3分区的消息,如果此时在该消费者组中引入第四个消费者,那么这个消费者哪个分区的消息都不能消费
- 不同组之间的消费者就是发布订阅模式
# 3. SpringBoot整合
参照:https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2
3
4
配置
spring.kafka.bootstrap-servers=172.20.128.1:9092
/**
* KafkaAutoConfiguration提供如下功能
* 1、KafkaProperties:kafka的所有配置; 以 spring.kafka开始
* - bootstrapServers: kafka集群的所有服务器地址
* - properties: 参数设置
* - consumer: 消费者
* - producer: 生产者
* ...
* 2、@EnableKafka: 开启Kafka的注解驱动功能
* 3、KafkaTemplate: 收发消息
* 4、KafkaAdmin: 维护主题等...
* 5、@EnableKafka + @KafkaListener 接受消息
* 1)消费者来接受消息,需要有group-id
* 2)收消息使用 @KafkaListener + ConsumerRecord(保存封装的消息,可以取出key、value)
* 3)spring.kafka 开始的所有配置
* 6、核心概念
* 分区: 分散存储,1T的数据分散到N个节点
* 副本: 备份机制,每个小分区的数据都有备份
* 主题: topics; 消息是发送给某个主题
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
修改C:\Windows\System32\drivers\etc\hosts
文件,配置8.130.32.70 kafka
,因为服务器那边是有kafka这个域名对应着结点的。
# 4. 消息发送
@SpringBootTest
class Boot07KafkaApplicationTests {
@Autowired
KafkaTemplate kafkaTemplate;
@Test
void contextLoads() throws ExecutionException, InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
CompletableFuture[] futures = new CompletableFuture[10000];
for (int i = 0; i < 10000; i++) {
CompletableFuture send = kafkaTemplate.send("order", "order.create."+i, "订单创建了:"+i);
futures[i]=send;
}
CompletableFuture.allOf(futures).join();
watch.stop();
System.out.println("总耗时:"+watch.getTotalTimeMillis());
}
}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 5. 消息监听
@Component
public class OrderMsgListener {
@KafkaListener(topics = "order",groupId = "order-service")
public void listen(ConsumerRecord record){
System.out.println("收到消息:"+record); //可以监听到发给kafka的新消息,以前的拿不到
}
@KafkaListener(groupId = "order-service-2",topicPartitions = {
@TopicPartition(topic = "order",partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "0")
})
})
public void listenAll(ConsumerRecord record){
System.out.println("收到partion-0消息:"+record);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 6. 参数配置
消费者
# 配置序列化器,将对象转json,下面这个是值的序列化器配置,另外还有键的序列化器
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
2
3
4
生产者
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
2
# 7. 自动配置原理
kafka 自动配置在KafkaAutoConfiguration
容器中放了 KafkaTemplate 可以进行消息收发
容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
@Bean public NewTopic topic1() { return TopicBuilder.name("thing") .partitions(1) .compact() .build(); }
1
2
3
4
5
6
7kafka 的配置在KafkaProperties中
@EnableKafka可以开启基于注解的模式