Hi Jean-Marc,

(Thanks, Yaroslav for your excellent answer 😊)

Before I come back to your question, I would like to mention an important 
consideration that you might have overseen:
If you replicate your Kafka cluster by means of e.g. MirrorMaker, the offsets 
on the replicated cluster will be much different per topic partition from those 
of the original cluster!
Hence, using the original offset will be in vain or even lead to data glitches …

In addition to what Yaroslav has written:

  *   If you ever only are interested in the latest committed 
checkpoint/savepoint, you can enable your source connectors to record the 
offsets per consumer group into the __consumer_offsets topic, and then read it 
from the topic.
(cf. to 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#consumer-offset-committing
 )

  *   Also, when you restart your job from the original savepoint, change the 
consumer group of your source connectors, such that Flink source connector 
considers your configuration instead of what is recorded in the savepoint (no 
need to patch the savepoint).

Also set 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#execution-state-recovery-ignore-unclaimed-state
 , otherwise Flink will complain about state in the savepoint that is not 
consumed by any operator on init.


Back to your original request, I’m not sure of your scenario?

  *   If this is a one-time migration to another Kafka cluster you have option 
of copying all files over to the new cluster (I’ve never done this before)
  *   If your scenario is failover in case of disruption
     *   You can create a federated cluster over 2 data centers
     *   Configure replication such that at least one copy exists in every 
datacenter
     *   Configure consumers to prefer the datacenter that is closer
     *   Similar for producers (the leader partition needs to be in the right 
datacenter)
     *   This would resolve the offset question for good …

I hope this helps

Sincere greetings

Thias




From: Yaroslav Tkachenko <[email protected]>
Sent: Wednesday, April 1, 2026 8:18 PM
To: [email protected]
Subject: [External] Re: Q: Trying to read the kafka offset stored in a savepoint

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠

Hi JM,

Here's the snippet for you: 
https://gist.github.com/sap1ens/6b894c52279b6d2339e50dde35070013

First, you need to get an operator UID hash for your Kafka source. 
ReadSavepoint.java shows how you can do that using the savepoint_metadata table 
function.

ReadKafkaSourceState.java shows how to use the State Processor API to read 
KafkaPartitionSplit stored in a savepoint.

You'd get an output like this (my consumer read the messages up to offset 199 
for partition 0):

> KafkaPartitionSplitState{topic='loadtest.json', partition=0, 
> startingOffset=200, stoppingOffset=NONE}

Note: I'm using Flink 2.1.1 and Kafka connector 4.0.1-2.0, but I hope the 
changes are minimal for 1.x (if any).

Hope this helps.

On Wed, Apr 1, 2026 at 4:42 AM Jean-Marc Paulin 
<[email protected]<mailto:[email protected]>> wrote:
Hi,

We are using Flink 1.20.3 and the flink connector 3.4.0-1.20. it's a streaming 
application using kafka source and sink.

Was wondering if anyone here has a tip/sample to read the content of the 
savepoints to extract the kafka offset per partition.

The scenario is around resuming from a replicated cluster. We plan to replicate 
kafka as well as the savepoints. What we do not know is if the latest savepoint 
that we replicated contains the kafka offset that flink would resume from with 
the latest savepoint, or if we need to resume from a previous savepoint.

Thanks for any tips.

JM
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to