Hi Robin,

Apart from what Alexander suggested, I think you could also try the
following first:
Let the job use a "new" Kafka source, which you can achieve by simply
assigning a different operator ID than before. If you previously did not
set an ID, then previously by default it would have been a hash computed by
Flink.
With a new operator ID, Flink would see this as a new source operator that
does not have previous state (i.e. there would be no partition offsets to
restore from). All other existing operators in the job will still restore
its previous state. With this "new" Kafka source, you can then set the
initial offsets to start consuming from by either setting a startup date or
specific map of partition offsets.

Also, in order for the job to successfully restore, I think you would need
to set the "--allowNonRestoredState" option when submitting the job.
This essentially tells Flink to ignore the fact that the "old" Kafka source
state is not being restored for the job (since there is no longer a
matching operator to restore those offsets to).

Cheers,
Gordon

On Fri, Jul 8, 2022 at 7:29 AM Alexander Fedulov <alexan...@ververica.com>
wrote:

> Hi Robin,
>
> you should be able to use the State Processor API to modify the operator
> state (sources) and override the offsets manually there. I never tried
> that, but I believe conceptually it should work.
>
> Best,
> Alexander Fedulov
>

Reply via email to