V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
qw564518158
V2EX  ›  程序员

kafka 多容器工厂反序列化 kafkaListenerContainerFactory

  •  
  •   qw564518158 · 2020-07-10 14:14:01 +08:00 · 551 次点击
    这是一个创建于 941 天前的主题,其中的信息可能已经有所发展或是发生改变。

    原创

    业务需要,批量消费,但是又想批量直接按 List<ModelDTO> 模式直接拉取数据,批量处理。

    private final String topic = "queue_notify";

    @KafkaListener(topics = topic ,containerFactory = "kafkaLiveListenerContainerFactory") public void listen(List pushLiveDTOS) { Long startTime = System.currentTimeMillis(); //批量个推 pushService.notifyLiveGetui (pushLiveDTOS); Long endTime = System.currentTimeMillis(); }

    private final String topic = "queue_push"; //containerFactory 容器工厂方法 @KafkaListener(topics = topic,containerFactory = "kafkaListenerContainerFactory") public void listen(List mallPushMongoDBS) { pushService.saveNoPush(mallPushMongoDBS); } 如上代码

    批量拉取不同 topic 获取到 list 的 DTO,进行处理。

    package com.dg.mall.push.config;

    import com.dg.mall.push.kafka.PushJsonDeserializer; import com.dg.mall.push.kafka.LiveJsonDeserializer; import com.dg.mall.push.kafka.listen.LiveNotifyListener; import com.dg.mall.push.kafka.listen.PushListener; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

    import java.util.HashMap; import java.util.Map;

    @Configuration @EnableKafka public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;
    @Value("${spring.kafka.consumer.max-consumer-number}")
    private Integer maxConsumerNumber;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
    
    public ConsumerFactory<String, byte[]> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs(),new StringDeserializer(),new pushJsonDeserializer());
    }
    
    
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //   //这里是反序列化的 pushJsonDeserializer 
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, pushJsonDeserializer.class);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,40000);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //最多批量获取 100 个
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxConsumerNumber);
        return propsMap;
    }
    
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaLiveListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(liveConsumerFactory());
        //并发数量
        factory.setConcurrency(concurrency);
        //批量获取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
    
    public ConsumerFactory<String, byte[]> liveConsumerFactory() {
        return new DefaultKafkaConsumerFactory(liveConsumerConfigs(),new StringDeserializer(),new PushLiveJsonDeserializer());
    }
    
    
    public Map<String, Object> liveConsumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //这里是反序列化的 liveJsonDeserializer
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, liveJsonDeserializer.class);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,40000);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //最多批量获取 100 个
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxConsumerNumber);
        return propsMap;
    }
    
    @Bean
    public PushListener listener() {
        return new PushListener();
    }
    
    
    @Bean
    public NotifyListener livelistener() {
        return new NotifyListener();
    }
    

    } //反序列化 package com.dg.mall.push.kafka;

    import com.dg.mall.push.model.message.PushLiveDTO; import com.gexin.fastjson.JSON; import org.apache.kafka.common.serialization.Deserializer;

    import java.util.Map;

    public class LiveJsonDeserializer implements Deserializer<PushLiveDTO> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    
    }
    
    @Override
    public PushLiveDTO deserialize(String topic, byte[] data) {
        return JSON.parseObject(data, PushLiveDTO.class);
    }
    
    @Override
    public void close() {
    
    }
    

    } //反序列化 package com.dg.mall.push.kafka;

    import com.dg.mall.push.model.message.MallPushMongoDB; import com.gexin.fastjson.JSON; import org.apache.kafka.common.serialization.Serializer;

    import java.util.Map;

    public class MallPushJsonSerializer implements Serializer<MallPushMongoDB> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    
    }
    
    @Override
    public byte[] serialize(String topic, MallPushMongoDB data) {
        return JSON.toJSONBytes(data);
    }
    
    @Override
    public void close() {
    
    }
    

    }

    总结 消费

    @KafkaListener(topics = topic ,containerFactory = "kafkaLiveListenerContainerFactory") 监听消费一定要加 containerFactory 对应 容器工厂类 ,容器工厂类里面有个反序列化,需要替换,一般都是 String 反序列化,这里我们替换成我们自己创建的 DTO,在来进行反序列化 LiveJsonDeserializer 。

    最后的最后监听消费批量获取的时候

    数据就这样全部获取到了。

    end!

    1 条回复    2020-07-15 16:33:55 +08:00
    qw564518158
        1
    qw564518158  
    OP
       2020-07-15 16:33:55 +08:00
    竟然没有人,尴尬了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   实用小工具   ·   1429 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 45ms · UTC 18:26 · PVG 02:26 · LAX 10:26 · JFK 13:26
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.