Hi JM,
Here's the snippet for you:
https://gist.github.com/sap1ens/6b894c52279b6d2339e50dde35070013
First, you need to get an operator UID hash for your Kafka
source. ReadSavepoint.java shows how you can do that using
the savepoint_metadata table function.
ReadKafkaSourceState.java shows how to use the State Processor API to
read KafkaPartitionSplit stored in a savepoint.
You'd get an output like this (my consumer read the messages up to offset
199 for partition 0):
> KafkaPartitionSplitState{topic='loadtest.json', partition=0,
startingOffset=200, stoppingOffset=NONE}
Note: I'm using Flink 2.1.1 and Kafka connector 4.0.1-2.0, but I hope the
changes are minimal for 1.x (if any).
Hope this helps.
On Wed, Apr 1, 2026 at 4:42 AM Jean-Marc Paulin <[email protected]> wrote:
> Hi,
>
> We are using Flink 1.20.3 and the flink connector 3.4.0-1.20. it's a
> streaming application using kafka source and sink.
>
> Was wondering if anyone here has a tip/sample to read the content of the
> savepoints to extract the kafka offset per partition.
>
> The scenario is around resuming from a replicated cluster. We plan to
> replicate kafka as well as the savepoints. What we do not know is if the
> latest savepoint that we replicated contains the kafka offset that flink
> would resume from with the latest savepoint, or if we need to resume from a
> previous savepoint.
>
> Thanks for any tips.
>
> JM
>