kafka消息批量处理
只需要三步:
1.添加配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Configuration public class KafkaConsumerConfig { @Autowired private KafkaProperties kafkaProperties;
@Bean ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory(); configurer.configure(factory, (ConsumerFactory) kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties()))); factory.setBatchListener(true); return factory; } }
|
- 修改消费代码
1 2 3 4
| @KafkaListener(topics = KafkaConstant., groupId = "", containerFactory = "kafkaListenerContainerFactory") public void xxx(List<String> messages, Acknowledgment ack) {
|
3.修改yml文件
1 2 3 4 5 6
| spring.kafka.consumer.max-poll-records: 1000
spring.kafka.consumer.fetch-min-size: 1048576
spring.kafka.consumer.fetch-max-wait: 500
|
注
不知道为什么,网上写开启批量拉取的配置类都是这么写的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Bean public KafkaListenerContainerFactory<?, ?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; }
@Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; }
|
ref:Spring Kafka:@KafkaListener 单条或批量处理消息-腾讯云开发者社区-腾讯云
还比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory; }
public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(12); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
|
这样不就相当于舍弃了方便的yml配置,自己写配置在代码中了么?