Hi Robin, Apart from what Alexander suggested, I think you could also try the following first: Let the job use a "new" Kafka source, which you can achieve by simply assigning a different operator ID than before. If you previously did not set an ID, then previously by default it would have been a hash computed by Flink. With a new operator ID, Flink would see this as a new source operator that does not have previous state (i.e. there would be no partition offsets to restore from). All other existing operators in the job will still restore its previous state. With this "new" Kafka source, you can then set the initial offsets to start consuming from by either setting a startup date or specific map of partition offsets.
Also, in order for the job to successfully restore, I think you would need to set the "--allowNonRestoredState" option when submitting the job. This essentially tells Flink to ignore the fact that the "old" Kafka source state is not being restored for the job (since there is no longer a matching operator to restore those offsets to). Cheers, Gordon On Fri, Jul 8, 2022 at 7:29 AM Alexander Fedulov <alexan...@ververica.com> wrote: > Hi Robin, > > you should be able to use the State Processor API to modify the operator > state (sources) and override the offsets manually there. I never tried > that, but I believe conceptually it should work. > > Best, > Alexander Fedulov >