Kafka 的消費(fèi)者可以使用兩種方式來提交消費(fèi)位移(offset):自動(dòng)提交和手動(dòng)提交。自動(dòng)提交是由 Kafka 客戶端自動(dòng)定期提交位移,而手動(dòng)提交則需要應(yīng)用程序顯式地調(diào)用 API 來提交位移。手動(dòng)提交位移的方式可以更精細(xì)地控制消費(fèi)位移,以及避免因自動(dòng)提交位移而產(chǎn)生的數(shù)據(jù)丟失或重復(fù)消費(fèi)等問題。
下面是使用 Kafka Java API 手動(dòng)提交位移的一些示例代碼:
1.啟用手動(dòng)提交位移:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關(guān)閉自動(dòng)提交位移
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
在創(chuàng)建 KafkaConsumer 對象時(shí),將 enable.auto.commit 屬性設(shè)置為 false,以關(guān)閉自動(dòng)提交位移的功能。
2.手動(dòng)提交位移:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 處理消息
}
consumer.commitSync(); // 手動(dòng)提交位移
}
在消費(fèi)消息后,調(diào)用 commitSync() 方法來手動(dòng)提交位移。如果需要批量提交位移,可以使用 commitSync(Map<topicpartition, offsetandmetadata=""> offsets) 方法來提交指定的分區(qū)和位移信息。
需要注意的是,手動(dòng)提交位移需要在適當(dāng)?shù)臅r(shí)機(jī)進(jìn)行提交,以確保數(shù)據(jù)不會(huì)丟失或重復(fù)消費(fèi)。一般來說,可以在消費(fèi)一批消息后,或者在處理完一段業(yè)務(wù)邏輯后,再進(jìn)行位移提交。同時(shí),還需要注意位移的提交順序,以保證數(shù)據(jù)的一致性。