Hi,
are you using KafkaIO? If yes, then you can enable offsets commit in
bundle finalize via [1]. Note on the other hand, that KafkaIO stores
offsets in checkpoint, so - provided you run your Beam Pipeline on a
runner with enabled checkpointing - it should not be necessary to commit
offsets to Kafka only for the sake of exactly once processing. That
should be granted even without that.
Please don't hesitate to ask if you have any more questions.
Best,
Jan
[1]
https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
On 9/22/21 7:46 AM, Wu, Huijun wrote:
Hi All,
I encounter with a problem which seems common, but I couldn’t find any
working solution online like stack overflow or google search, so I am
asking for help here.
I create a simple Apache Beam streaming pipeline which read data from
Kafka, do some processing and persist the result by calling some
external service's API. I want to make sure no data are lost during
pipeline restart or failure so I want to manually commit the record
offset to Kafka after I successfully call the API at the end of
specific doFun execution.
In my previous Kafka experience, I know that by using Kafka Consumer's
below API, I am able to manually commit the record offset to Kafka.
*consumer.commitSync(currentOffsets); *
There is setting to turn off the auto commit in KafkaIO setup, however
I didn't find any working solution or interfaces exposed to manually
commit offset in Apache Beam as there seems no way I can access the
consumer in doFun. The project I am working on is using Apache Beam
2.16.0 due to some historical reasons, but I am happy to upgrade it if
latest feature provides working solution.
I will be really appreciated if if some expert can kindly share some
hint with sample codes.
Best Regards,