Hi Bastien,

Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
Flink in the master branch. Could you please point out the code that
committed offset is used as default?

W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
is used, an exception will be thrown at runtime in case there is no
committed offset, which is useful if the user is intended to read from the
committed offset but something is wrong. It might feel weird if it is used
as default, because an exception will be thrown when users start new jobs
with default settings.

Best regards,
Jing

On Tue, Jun 14, 2022 at 4:15 PM bastien dine <bastien.d...@gmail.com> wrote:

> Hello everyone,
>
> Does someone know why the starting offset behaviour has changed in the new
> Kafka Source ?
>
> This is now from earliest (code in KafkaSourceBuilder), doc says :
> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
> be used by default." from :
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>
> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
> setStartFromGroupOffsets()
> method)
>
> which match with this behaviour in new KafkaSource :   :
> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>
> This change can lead to big troubles if user pay no attention to this
> point when migrating from old KafkaConsumer to new KafkaSource,
>
> Regards,
> Bastien
>
> ------------------
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>

Reply via email to