kafka消息批量处理

只需要三步:
1.添加配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

@Configuration
public class KafkaConsumerConfig {
@Autowired
private KafkaProperties kafkaProperties;

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory();
configurer.configure(factory, (ConsumerFactory) kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties())));
// 开启批量消费
factory.setBatchListener(true);
return factory;
}
}
  1. 修改消费代码
1
2
3
4
@KafkaListener(topics = KafkaConstant.,
groupId = "",
containerFactory = "kafkaListenerContainerFactory")
public void xxx(List<String> messages, Acknowledgment ack) {

3.修改yml文件

1
2
3
4
5
6
# 参数控制每次调用poll()方法时返回的最大记录数。当消费者从Kafka获取数据时,默认一次最多返回500条消息。
spring.kafka.consumer.max-poll-records: 1000
# 消费者向Broker发起拉取请求时,要求的最小数据量。* Broker会等待累积到至少`1MB`的数据后再响应消费者请求,减少网络交互次数,提高吞吐量。默认1字节,即立即返回
spring.kafka.consumer.fetch-min-size: 1048576
# Broker在响应拉取请求前,为满足`fetch-min-size`而等待的最大时间。若数据量未达`fetch-min-size`,最多等待500ms后返回已累积的数据,避免因等待数据累积而引入过高延迟。默认500ms
spring.kafka.consumer.fetch-max-wait: 500

不知道为什么,网上写开启批量拉取的配置类都是这么写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 @Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}

ref:Spring Kafka:@KafkaListener 单条或批量处理消息-腾讯云开发者社区-腾讯云
还比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 @Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// factory.getContainerProperties().setPollTimeout(15000);
return factory;
}


public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(12);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);

这样不就相当于舍弃了方便的yml配置,自己写配置在代码中了么?