Springboot 使用kafka

2024-04-27 10:39 孙水迪 234

一、配置

在配置文件里配置kafka的地址,生产者和消费者的一些数据,springboot跟根据配置的属性自动装配,一般使用如下配置即可,如果有特殊需求,可根据情况调整配置项

spring:
  kafka:
    bootstrap-servers: localhost:9992
    producer:
      # 应答级别: broker什么时候向生产者发出消息确认收到的ack确认(0:brokers收到了,1:主通过,-1:isr/半数通过)
      acks: 1
      # 控制消息压缩的方式,可选项:none, gzip, snappy, lz4
      compression-type: lz4
      # 消息在缓冲区中保留的时间
      linger-ms: 1000
    consumer:
      client-id: dev
      group-id: abc
      compression-type: lz4
      # 消费者自动提交偏移量
      enable-auto-commit: true
      # 从最近的offset开始消费,就是新加入partition的消息才会被消费
      auto-offset-reset: latest
      properties:
        # 消费回话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance)
        session:
          timeout:
            ms: 120000
        # 消费请求超时时间
        request:
          timeout:
            ms: 180000
      # 批量消费每次最多拉去消息数
      max-poll-records: 50
    listener:
      # 消费端监听的topic不存在时,启动项目报错
      missing-topics-fatal: false
      # 批量消费
      type: batch

二、生产消息

对kafkaTemplate进行封装,将传递的消息序列化并编码后发送,get方法可以提供给消费者调用对接受的消息进行解码,通过规定编码方式,可以让消息在传输过程中不至于因为编码问题成为乱码,编码方式可以自行决定

@Component
public class KafkaTemplate2 {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, Object data) {
        kafkaTemplate.send(topic, encode(JSON.toJSONString(data, SerializerFeature.BrowserCompatible)));
    }

    public <T> T getObject(ConsumerRecord<String, String> record, Class<T> clazz) {
        return JSON.parseObject(decode(record.value()), clazz);
    }

    public <T> List<T> getList(ConsumerRecord<String, String> record, Class<T> clazz) {
        return JSON.parseArray(decode(record.value()), clazz);
    }

    public String encode(String value) {
        return new String(
                Base64.getEncoder().encode(DES.encrypt(value.getBytes(StandardCharsets.UTF_8)))
                , StandardCharsets.UTF_8);

    }

    public String decode(String value) {
        return new String(
                DES.decrypt(Base64.getDecoder().decode(value.getBytes(StandardCharsets.UTF_8)))
                , StandardCharsets.UTF_8);
    }

}

三、消费消息

可通过注解的方式设置消费的topic,收到的消息解码后进行消费

@KafkaListener(topics = TopicUpdate)
public void onMessageTopicUpdate(List<ConsumerRecord<String, String>> records){
    for (ConsumerRecord<String,String> record : records) {
        try {
            Topics kafkaTopic=kafkaTemplate2.getObject(record,Topics.class);
            //todo
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}