Consumer 的消费位移 : 记录 Consumer 下一条消息的消费位移
提交位移 (Committing Offsets) : Consumer 向 Kafka 汇报位移数据
Consumer API 的提交位移的方法 :
| 提交位移 | 自动提交 | 配置 | enable.auto.commit = true |
|---|---|---|---|
| 手动提交 | 同步提交 | KafkaConsumer.commitSync | |
| 异步提交 | KafkaConsumer.commitAsync | ||
| 细化位移提交 | commitSync(Map | ||
commitAsync(Map |
Consumer 参数 :
enable.auto.commit = true : 自动提交位移auto.commit.interval.ms (默认值是 5 秒) : Kafka 每 5 秒自动提交一次位移自动提交位移 :
设置自动提交位移 :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
}
enable.auto.commit = false : 手动提交位移
手动提交位移 :
commitSync() :
while (true) {// 返回最新位移。一直等位移提交后才返回 (同步操作)ConsumerRecords records =consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}
commitAsync() :
while (true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null)handle(exception);});
}
异步无阻塞式 :
// 实现异步无阻塞式的位移管理,保证 Consumer 位移的正确性
try {while (true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞}
} catch (Exception e) {handle(e); // 处理异常
} finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();}
}
更精细的位移管理 :
commitSync(Map) commitAsync(Map) // 创建 Map 对象,保存 Consumer 消费要提交的分区位移
private Map offsets = new HashMap<>();
int count = 0;
//...
while (true) {ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord record: records) {process(record); // 处理消息// 构造要提交的位移值offsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1);// 每 100 条消息提交一次位移if(count % 100 == 0){consumer.commitAsync(offsets, null); // 回调处理逻辑是 null}count++;}
}