一、配置
在配置文件里配置kafka的地址,生产者和消费者的一些数据,springboot跟根据配置的属性自动装配,一般使用如下配置即可,如果有特殊需求,可根据情况调整配置项
spring:
kafka:
bootstrap-servers: localhost:9992
producer:
# 应答级别: broker什么时候向生产者发出消息确认收到的ack确认(0:brokers收到了,1:主通过,-1:isr/半数通过)
acks: 1
# 控制消息压缩的方式,可选项:none, gzip, snappy, lz4
compression-type: lz4
# 消息在缓冲区中保留的时间
linger-ms: 1000
consumer:
client-id: dev
group-id: abc
compression-type: lz4
# 消费者自动提交偏移量
enable-auto-commit: true
# 从最近的offset开始消费,就是新加入partition的消息才会被消费
auto-offset-reset: latest
properties:
# 消费回话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance)
session:
timeout:
ms: 120000
# 消费请求超时时间
request:
timeout:
ms: 180000
# 批量消费每次最多拉去消息数
max-poll-records: 50
listener:
# 消费端监听的topic不存在时,启动项目报错
missing-topics-fatal: false
# 批量消费
type: batch
二、生产消息
对kafkaTemplate进行封装,将传递的消息序列化并编码后发送,get方法可以提供给消费者调用对接受的消息进行解码,通过规定编码方式,可以让消息在传输过程中不至于因为编码问题成为乱码,编码方式可以自行决定
@Component
public class KafkaTemplate2 {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, Object data) {
kafkaTemplate.send(topic, encode(JSON.toJSONString(data, SerializerFeature.BrowserCompatible)));
}
public <T> T getObject(ConsumerRecord<String, String> record, Class<T> clazz) {
return JSON.parseObject(decode(record.value()), clazz);
}
public <T> List<T> getList(ConsumerRecord<String, String> record, Class<T> clazz) {
return JSON.parseArray(decode(record.value()), clazz);
}
public String encode(String value) {
return new String(
Base64.getEncoder().encode(DES.encrypt(value.getBytes(StandardCharsets.UTF_8)))
, StandardCharsets.UTF_8);
}
public String decode(String value) {
return new String(
DES.decrypt(Base64.getDecoder().decode(value.getBytes(StandardCharsets.UTF_8)))
, StandardCharsets.UTF_8);
}
}
三、消费消息
可通过注解的方式设置消费的topic,收到的消息解码后进行消费
@KafkaListener(topics = TopicUpdate)
public void onMessageTopicUpdate(List<ConsumerRecord<String, String>> records){
for (ConsumerRecord<String,String> record : records) {
try {
Topics kafkaTopic=kafkaTemplate2.getObject(record,Topics.class);
//todo
} catch (Exception e){
e.printStackTrace();
}
}
}