对于kafka-binder来说,设置autoCommitOffset为false.然后在listen中手动确认
1 2 3 4 5
| @StreamListener(Sink.INPUT) void listen(@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment){ acknowledgment.acknowledge(); }
|
需要注意的是autoCommitOffset的设置位置.
1 2
| spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false#应该在这里设置 spring.cloud.stream.bindings.input.consumer.autoCommitOffset=false#这里设置是无效的,获取Acknowledgment时会是null
|